2607 lines
100 KiB
Python
2607 lines
100 KiB
Python
"""
|
|
Plain FastAPI admin UI backed by SQLAlchemy and Redis sessions.
|
|
|
|
This replaces the previous fastapi-admin runtime path, which depended on
|
|
Tortoise-oriented internals that do not match this project.
|
|
"""
|
|
|
|
import secrets
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from html import escape, unescape
|
|
import json
|
|
import re
|
|
from typing import Any
|
|
|
|
import aioredis
|
|
from fastapi import APIRouter, Depends, File, Form, Request, UploadFile
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from starlette.responses import HTMLResponse, RedirectResponse
|
|
from starlette.status import HTTP_303_SEE_OTHER, HTTP_401_UNAUTHORIZED
|
|
|
|
from app.core.config import get_settings
|
|
from app.database import get_db
|
|
from app.models import (
|
|
AIGenerationRun,
|
|
Item,
|
|
Session,
|
|
Tryout,
|
|
TryoutImportSnapshot,
|
|
TryoutSnapshotQuestion,
|
|
UserAnswer,
|
|
Website,
|
|
)
|
|
from app.services.ai_generation import (
|
|
SUPPORTED_MODELS,
|
|
create_generation_run,
|
|
generate_questions_batch,
|
|
get_ai_stats,
|
|
save_ai_question,
|
|
validate_ai_model,
|
|
)
|
|
from app.services.irt_calibration import get_calibration_status
|
|
from app.services.tryout_json_import import (
|
|
TryoutImportError,
|
|
import_tryout_json_snapshot,
|
|
preview_tryout_json_import,
|
|
)
|
|
|
|
settings = get_settings()
|
|
router = APIRouter(prefix="/admin", tags=["admin-web"])
|
|
|
|
SESSION_COOKIE = "access_token"
|
|
SESSION_PREFIX = "admin:session:"
|
|
IMPORT_PREVIEW_PREFIX = "admin:import-preview:"
|
|
IMPORT_PREVIEW_TTL_SECONDS = 900
|
|
|
|
_admin_redis = None
|
|
|
|
|
|
@dataclass
|
|
class AdminPrincipal:
|
|
username: str
|
|
|
|
|
|
async def configure_admin_web() -> None:
|
|
global _admin_redis
|
|
|
|
if _admin_redis is not None:
|
|
return
|
|
|
|
if not settings.ADMIN_USERNAME or not settings.ADMIN_PASSWORD:
|
|
raise RuntimeError("ENABLE_ADMIN=true requires ADMIN_USERNAME and ADMIN_PASSWORD to be set.")
|
|
|
|
_admin_redis = aioredis.from_url(
|
|
settings.REDIS_URL,
|
|
encoding="utf-8",
|
|
decode_responses=True,
|
|
)
|
|
|
|
|
|
async def shutdown_admin_web() -> None:
|
|
global _admin_redis
|
|
|
|
if _admin_redis is None:
|
|
return
|
|
|
|
try:
|
|
await _admin_redis.close()
|
|
finally:
|
|
_admin_redis = None
|
|
|
|
|
|
async def _current_admin(request: Request) -> AdminPrincipal | None:
|
|
if _admin_redis is None:
|
|
return None
|
|
|
|
token = request.cookies.get(SESSION_COOKIE)
|
|
if not token:
|
|
return None
|
|
|
|
username = await _admin_redis.get(f"{SESSION_PREFIX}{token}")
|
|
if not username:
|
|
return None
|
|
|
|
return AdminPrincipal(username=str(username))
|
|
|
|
|
|
def _login_redirect() -> RedirectResponse:
|
|
return RedirectResponse(url="/admin/login", status_code=HTTP_303_SEE_OTHER)
|
|
|
|
|
|
def _dashboard_redirect() -> RedirectResponse:
|
|
return RedirectResponse(url="/admin/dashboard", status_code=HTTP_303_SEE_OTHER)
|
|
|
|
|
|
def _render_auth_page(
|
|
request: Request,
|
|
title: str,
|
|
subtitle: str,
|
|
body: str,
|
|
status_code: int = 200,
|
|
) -> HTMLResponse:
|
|
remember_me_checked = "checked" if request.cookies.get("remember_me") == "on" else ""
|
|
html = f"""<!DOCTYPE html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
<title>{escape(title)}</title>
|
|
<style>
|
|
body {{ margin: 0; min-height: 100vh; display: grid; place-items: center; background: linear-gradient(135deg, #f8fafc, #e2e8f0); font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; color: #0f172a; }}
|
|
.panel {{ width: min(420px, calc(100vw - 32px)); background: rgba(255,255,255,0.96); border-radius: 18px; box-shadow: 0 18px 60px rgba(15, 23, 42, 0.14); padding: 28px; }}
|
|
h1 {{ margin: 0 0 8px; font-size: 28px; }}
|
|
p {{ margin: 0 0 20px; color: #475569; }}
|
|
label {{ display: block; font-size: 14px; font-weight: 600; margin: 14px 0 8px; }}
|
|
input {{ width: 100%; box-sizing: border-box; border: 1px solid #cbd5e1; border-radius: 10px; padding: 12px 14px; font-size: 15px; }}
|
|
.row {{ display: flex; align-items: center; gap: 10px; margin-top: 16px; color: #334155; font-size: 14px; }}
|
|
.row input {{ width: auto; }}
|
|
button {{ width: 100%; margin-top: 18px; border: 0; border-radius: 10px; padding: 12px 14px; background: #0f172a; color: #fff; font-size: 15px; font-weight: 600; cursor: pointer; }}
|
|
.error {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #fef2f2; color: #991b1b; border: 1px solid #fecaca; }}
|
|
.muted {{ color: #64748b; font-size: 13px; margin-top: 14px; }}
|
|
a {{ color: #0f172a; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<main class="panel">
|
|
<h1>{escape(title)}</h1>
|
|
<p>{escape(subtitle)}</p>
|
|
{body.replace("__REMEMBER_ME_CHECKED__", remember_me_checked)}
|
|
</main>
|
|
</body>
|
|
</html>"""
|
|
return HTMLResponse(html, status_code=status_code)
|
|
|
|
|
|
def _render_admin_page(title: str, page_title: str, body: str) -> HTMLResponse:
|
|
html = f"""<!DOCTYPE html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
<title>{escape(title)}</title>
|
|
<style>
|
|
body {{ margin: 0; font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; background: #f5f7fb; color: #162033; }}
|
|
.layout {{ display: grid; grid-template-columns: 240px 1fr; min-height: 100vh; }}
|
|
.sidebar {{ background: #0f172a; color: #e2e8f0; padding: 24px 16px; }}
|
|
.sidebar h1 {{ font-size: 18px; margin: 0 0 24px; }}
|
|
.sidebar a {{ display: block; color: #cbd5e1; text-decoration: none; padding: 10px 12px; border-radius: 8px; margin-bottom: 8px; }}
|
|
.sidebar a:hover {{ background: #1e293b; color: #fff; }}
|
|
.content {{ padding: 32px; }}
|
|
.card {{ background: #fff; border-radius: 14px; padding: 24px; box-shadow: 0 8px 30px rgba(15, 23, 42, 0.08); }}
|
|
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 16px; margin-top: 20px; }}
|
|
.stat {{ padding: 18px; border: 1px solid #e2e8f0; border-radius: 12px; background: #f8fafc; }}
|
|
.stat strong {{ display: block; font-size: 26px; margin-top: 6px; }}
|
|
table {{ width: 100%; border-collapse: collapse; margin-top: 16px; background: #fff; border-radius: 12px; overflow: hidden; }}
|
|
th, td {{ padding: 12px 14px; border-bottom: 1px solid #e5e7eb; text-align: left; vertical-align: top; font-size: 14px; }}
|
|
th {{ background: #f8fafc; font-weight: 600; }}
|
|
input, select, textarea {{ width: 100%; box-sizing: border-box; border: 1px solid #cbd5e1; border-radius: 10px; padding: 12px 14px; font-size: 15px; }}
|
|
label {{ display: block; font-size: 14px; font-weight: 600; margin: 14px 0 8px; }}
|
|
button {{ border: 0; border-radius: 10px; padding: 12px 14px; background: #0f172a; color: #fff; font-size: 15px; font-weight: 600; cursor: pointer; }}
|
|
.actions {{ display: flex; gap: 12px; flex-wrap: wrap; margin-top: 18px; }}
|
|
.row {{ display: flex; align-items: center; gap: 10px; margin-top: 12px; color: #334155; font-size: 14px; }}
|
|
.row input {{ width: auto; }}
|
|
.error {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #fef2f2; color: #991b1b; border: 1px solid #fecaca; }}
|
|
.success {{ margin: 0 0 16px; padding: 12px 14px; border-radius: 10px; background: #ecfdf5; color: #166534; border: 1px solid #86efac; }}
|
|
.muted {{ color: #64748b; font-size: 14px; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="layout">
|
|
<aside class="sidebar">
|
|
<h1>IRT Bank Soal Admin</h1>
|
|
<a href="/admin/dashboard">Dashboard</a>
|
|
<a href="/admin/websites">Websites</a>
|
|
<a href="/admin/tryout-import">Tryout Import</a>
|
|
<a href="/admin/basis-items">Basis Items</a>
|
|
<a href="/admin/calibration-status">Calibration Status</a>
|
|
<a href="/admin/item-statistics">Item Statistics</a>
|
|
<a href="/admin/session-overview">Session Overview</a>
|
|
<a href="/admin/ai-playground">AI Playground</a>
|
|
<a href="/admin/password">Password Info</a>
|
|
<a href="/admin/logout">Logout</a>
|
|
</aside>
|
|
<main class="content">
|
|
<div class="card">
|
|
<h2>{escape(page_title)}</h2>
|
|
{body}
|
|
</div>
|
|
</main>
|
|
</div>
|
|
</body>
|
|
</html>"""
|
|
return HTMLResponse(html)
|
|
|
|
|
|
def _table(headers: list[str], rows: list[list[Any]]) -> str:
|
|
head = "".join(f"<th>{escape(str(header))}</th>" for header in headers)
|
|
body_rows = []
|
|
for row in rows:
|
|
cols = "".join(f"<td>{escape(str(value))}</td>" for value in row)
|
|
body_rows.append(f"<tr>{cols}</tr>")
|
|
body = "".join(body_rows) or f"<tr><td colspan=\"{len(headers)}\">No data</td></tr>"
|
|
return f"<table><thead><tr>{head}</tr></thead><tbody>{body}</tbody></table>"
|
|
|
|
|
|
def _truncate(text: str | None, max_length: int = 120) -> str:
|
|
if not text:
|
|
return ""
|
|
if len(text) <= max_length:
|
|
return text
|
|
return f"{text[: max_length - 3]}..."
|
|
|
|
|
|
def _html_to_text(value: str | None) -> str:
|
|
if not value:
|
|
return ""
|
|
text = re.sub(r"<[^>]+>", " ", value)
|
|
text = unescape(text)
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
return text
|
|
|
|
|
|
def _websites_form_body(
|
|
websites: list[Website],
|
|
error: str | None = None,
|
|
success: str | None = None,
|
|
site_name: str = "",
|
|
site_url: str = "",
|
|
) -> str:
|
|
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
|
|
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
|
|
body_rows = []
|
|
for website in websites:
|
|
actions = f"""
|
|
<div class="actions" style="margin-top:0">
|
|
<a href="/admin/websites/{website.id}/edit" style="display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;">Edit</a>
|
|
<form method="post" action="/admin/websites/{website.id}/delete" onsubmit="return confirm('Delete website {escape(website.site_name)} and all related tryouts, items, sessions, and snapshots?');" style="margin:0">
|
|
<button type="submit" style="background:#991b1b;">Delete</button>
|
|
</form>
|
|
</div>
|
|
"""
|
|
body_rows.append(
|
|
"<tr>"
|
|
f"<td>{website.id}</td>"
|
|
f"<td>{escape(website.site_name)}</td>"
|
|
f"<td>{escape(website.site_url)}</td>"
|
|
f"<td>{actions}</td>"
|
|
"</tr>"
|
|
)
|
|
if body_rows:
|
|
websites_table = (
|
|
"<table><thead><tr><th>ID</th><th>Name</th><th>URL</th><th>Actions</th></tr></thead><tbody>"
|
|
+ "".join(body_rows)
|
|
+ "</tbody></table>"
|
|
)
|
|
else:
|
|
websites_table = _table(["ID", "Name", "URL", "Actions"], [])
|
|
return f"""
|
|
<p class="muted">Register websites here so imports and tryout references can be tied to a known source site.</p>
|
|
{success_html}
|
|
{error_html}
|
|
<form method="post" action="/admin/websites" autocomplete="off">
|
|
<label for="site_name">Website Name</label>
|
|
<input id="site_name" name="site_name" type="text" value="{escape(site_name)}" placeholder="Sejoli Demo Site">
|
|
<label for="site_url">Website URL</label>
|
|
<input id="site_url" name="site_url" type="url" value="{escape(site_url)}" placeholder="https://example.com">
|
|
<button type="submit">Add Website</button>
|
|
</form>
|
|
<h3 style="margin-top:24px">Registered Websites</h3>
|
|
<p class="muted">Use the website ID when importing read-only tryout snapshots.</p>
|
|
{websites_table}
|
|
"""
|
|
|
|
|
|
def _website_edit_form_body(
|
|
website: Website,
|
|
error: str | None = None,
|
|
success: str | None = None,
|
|
site_name: str | None = None,
|
|
site_url: str | None = None,
|
|
) -> str:
|
|
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
|
|
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
|
|
display_name = website.site_name if site_name is None else site_name
|
|
display_url = website.site_url if site_url is None else site_url
|
|
return f"""
|
|
<p class="muted">Website ID: <strong>{website.id}</strong></p>
|
|
{success_html}
|
|
{error_html}
|
|
<form method="post" action="/admin/websites/{website.id}/edit" autocomplete="off">
|
|
<label for="site_name">Website Name</label>
|
|
<input id="site_name" name="site_name" type="text" value="{escape(display_name)}">
|
|
<label for="site_url">Website URL</label>
|
|
<input id="site_url" name="site_url" type="url" value="{escape(display_url)}">
|
|
<div class="actions">
|
|
<button type="submit">Save Changes</button>
|
|
<a href="/admin/websites" style="display:inline-block;padding:12px 14px;border-radius:10px;background:#e2e8f0;color:#0f172a;text-decoration:none;font-size:15px;font-weight:600;">Back</a>
|
|
</div>
|
|
</form>
|
|
"""
|
|
|
|
|
|
def _tryout_import_form_body(
|
|
websites: list[Website],
|
|
recent_snapshots: list[TryoutImportSnapshot],
|
|
error: str | None = None,
|
|
success: str | None = None,
|
|
selected_website_id: int | None = None,
|
|
preview: dict[str, Any] | None = None,
|
|
preview_token: str | None = None,
|
|
upload_filename: str = "",
|
|
) -> str:
|
|
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
|
|
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
|
|
|
|
website_options = ['<option value="">Select website</option>']
|
|
for website in websites:
|
|
selected = "selected" if selected_website_id == website.id else ""
|
|
website_options.append(
|
|
f'<option value="{website.id}" {selected}>{escape(website.site_name)} (#{website.id})</option>'
|
|
)
|
|
|
|
website_map = {website.id: website.site_name for website in websites}
|
|
snapshot_rows = []
|
|
for snapshot in recent_snapshots:
|
|
snapshot_rows.append(
|
|
"<tr>"
|
|
f"<td>{snapshot.id}</td>"
|
|
f"<td>{escape(website_map.get(snapshot.website_id, 'Unknown'))} (#{snapshot.website_id})</td>"
|
|
f"<td>{escape(snapshot.source_tryout_id)}</td>"
|
|
f"<td>{escape(snapshot.title)}</td>"
|
|
f"<td>{snapshot.question_count}</td>"
|
|
f"<td>{escape(str(snapshot.created_at))}</td>"
|
|
f"<td><a href=\"/admin/snapshot-questions?snapshot_id={snapshot.id}\" "
|
|
"style=\"display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;\">Browse</a></td>"
|
|
"</tr>"
|
|
)
|
|
snapshots_table = (
|
|
"<table><thead><tr><th>Snapshot ID</th><th>Website</th><th>Tryout ID</th><th>Title</th><th>Questions</th><th>Imported At</th><th>Actions</th></tr></thead><tbody>"
|
|
+ ("".join(snapshot_rows) if snapshot_rows else "<tr><td colspan=\"7\">No data</td></tr>")
|
|
+ "</tbody></table>"
|
|
)
|
|
|
|
preview_html = ""
|
|
if preview:
|
|
totals = preview.get("totals") or {}
|
|
tryout_rows = []
|
|
for tryout in preview.get("tryouts") or []:
|
|
diff = tryout.get("question_diff") or {}
|
|
warnings = "; ".join(tryout.get("warnings") or []) or "-"
|
|
tryout_rows.append(
|
|
[
|
|
tryout.get("source_tryout_id"),
|
|
tryout.get("title"),
|
|
diff.get("total_questions", 0),
|
|
diff.get("new_questions", 0),
|
|
diff.get("updated_questions", 0),
|
|
diff.get("unchanged_questions", 0),
|
|
diff.get("removed_questions", 0),
|
|
warnings,
|
|
]
|
|
)
|
|
|
|
import_form = ""
|
|
if preview_token and selected_website_id:
|
|
import_form = f"""
|
|
<form method="post" action="/admin/tryout-import" autocomplete="off">
|
|
<input type="hidden" name="website_id" value="{selected_website_id}">
|
|
<input type="hidden" name="preview_token" value="{escape(preview_token)}">
|
|
<button type="submit">Import Snapshot</button>
|
|
</form>
|
|
"""
|
|
|
|
preview_html = f"""
|
|
<h3 style="margin-top:24px">Preview Summary</h3>
|
|
<p class="muted">File: <strong>{escape(upload_filename or "uploaded JSON")}</strong></p>
|
|
<div class="grid">
|
|
<div class="stat">Tryouts<strong>{preview.get("tryout_count", 0)}</strong></div>
|
|
<div class="stat">New Questions<strong>{totals.get("new_questions", 0)}</strong></div>
|
|
<div class="stat">Updated Questions<strong>{totals.get("updated_questions", 0)}</strong></div>
|
|
<div class="stat">Removed Questions<strong>{totals.get("removed_questions", 0)}</strong></div>
|
|
</div>
|
|
{_table(
|
|
["Tryout ID", "Title", "Total", "New", "Updated", "Unchanged", "Removed", "Warnings"],
|
|
tryout_rows,
|
|
)}
|
|
<div class="actions">{import_form}</div>
|
|
"""
|
|
|
|
return f"""
|
|
<p class="muted">Import Sejoli tryout JSON as read-only snapshot reference data. This does not create live item-bank questions.</p>
|
|
<p class="muted">Use this when the source tryout changes upstream. Re-import updates matching source question IDs, inserts new ones, and marks missing ones inactive.</p>
|
|
{success_html}
|
|
{error_html}
|
|
<form method="post" action="/admin/tryout-import/preview" enctype="multipart/form-data" autocomplete="off">
|
|
<label for="website_id">Website</label>
|
|
<select id="website_id" name="website_id">{''.join(website_options)}</select>
|
|
<label for="file">Tryout Export JSON</label>
|
|
<input id="file" name="file" type="file" accept=".json,application/json">
|
|
<button type="submit">Preview Import</button>
|
|
</form>
|
|
{preview_html}
|
|
<h3 style="margin-top:24px">Recent Snapshots</h3>
|
|
<p class="muted">These are archived imports stored in PostgreSQL for traceability.</p>
|
|
{snapshots_table}
|
|
"""
|
|
|
|
|
|
def _snapshot_slot_map(snapshot: TryoutImportSnapshot) -> dict[str, int]:
|
|
slot_map: dict[str, int] = {}
|
|
questions = (snapshot.raw_payload or {}).get("questions") or []
|
|
for index, question in enumerate(questions, start=1):
|
|
source_question_id = str((question or {}).get("id") or "").strip()
|
|
if source_question_id:
|
|
slot_map[source_question_id] = index
|
|
return slot_map
|
|
|
|
|
|
def _snapshot_options_to_item_options(raw_options: list[dict[str, Any]] | list[Any]) -> dict[str, str]:
|
|
item_options: dict[str, str] = {}
|
|
for option in raw_options or []:
|
|
if not isinstance(option, dict):
|
|
continue
|
|
increment = str(option.get("increment") or "").strip().upper()
|
|
text = str(option.get("text") or option.get("label") or "").strip()
|
|
if increment and text:
|
|
item_options[increment] = text
|
|
return item_options
|
|
|
|
|
|
def _snapshot_questions_body(
|
|
snapshot: TryoutImportSnapshot,
|
|
questions: list[TryoutSnapshotQuestion],
|
|
promoted_items_by_slot: dict[int, Item],
|
|
error: str | None = None,
|
|
success: str | None = None,
|
|
) -> str:
|
|
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
|
|
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
|
|
slot_map = _snapshot_slot_map(snapshot)
|
|
rows = []
|
|
for question in questions:
|
|
slot = slot_map.get(question.source_question_id, 0)
|
|
promoted_item = promoted_items_by_slot.get(slot)
|
|
if promoted_item:
|
|
select_html = ""
|
|
action_html = (
|
|
f'Item #{promoted_item.id} already exists. '
|
|
f'<a href="/admin/ai-playground?basis_item_id={promoted_item.id}">Open in AI Playground</a>'
|
|
)
|
|
else:
|
|
select_html = f'<input type="checkbox" name="snapshot_question_ids" value="{question.id}">'
|
|
action_html = "Ready to promote"
|
|
rows.append(
|
|
"<tr>"
|
|
f"<td>{select_html}</td>"
|
|
f"<td>{slot or '-'}</td>"
|
|
f"<td>{escape(question.source_question_id)}</td>"
|
|
f"<td>{escape(question.correct_answer)}</td>"
|
|
f"<td>{question.option_count}</td>"
|
|
f"<td>{'Yes' if question.is_active else 'No'}</td>"
|
|
f"<td>{escape(_truncate(question.question_title or question.question_html, 100))}</td>"
|
|
f"<td>{action_html}</td>"
|
|
"</tr>"
|
|
)
|
|
questions_table = (
|
|
"<form method=\"post\" action=\"/admin/snapshot-questions/promote-bulk\">"
|
|
f"<input type=\"hidden\" name=\"snapshot_id\" value=\"{snapshot.id}\">"
|
|
"<div class=\"actions\" style=\"margin:16px 0\">"
|
|
"<button type=\"submit\">Promote Selected as Basis Items</button>"
|
|
"</div>"
|
|
"<table><thead><tr><th><input type=\"checkbox\" onclick=\"document.querySelectorAll('input[name="snapshot_question_ids"]').forEach(el => el.checked = this.checked)\"></th><th>Slot</th><th>Source Question ID</th><th>Correct</th><th>Options</th><th>Active</th><th>Stem</th><th>Action</th></tr></thead><tbody>"
|
|
+ ("".join(rows) if rows else "<tr><td colspan=\"8\">No data</td></tr>")
|
|
+ "</tbody></table>"
|
|
"</form>"
|
|
)
|
|
return f"""
|
|
<p class="muted">Snapshot ID: <strong>{snapshot.id}</strong> | Website: <strong>{snapshot.website_id}</strong> | Tryout: <strong>{escape(snapshot.source_tryout_id)}</strong></p>
|
|
<p class="muted">Promote selected snapshot questions into the live <code>items</code> table as <code>sedang</code> basis items for AI generation.</p>
|
|
{success_html}
|
|
{error_html}
|
|
{questions_table}
|
|
<p style="margin-top:20px"><a href="/admin/tryout-import">Back to Tryout Import</a></p>
|
|
"""
|
|
|
|
|
|
async def _basis_items_for_playground(db: AsyncSession, limit: int = 20) -> list[Item]:
|
|
result = await db.execute(
|
|
select(Item)
|
|
.where(Item.level == "sedang")
|
|
.order_by(Item.created_at.desc(), Item.id.desc())
|
|
.limit(limit)
|
|
)
|
|
return list(result.scalars().all())
|
|
|
|
|
|
async def _recent_generation_runs(db: AsyncSession, limit: int = 20) -> list[AIGenerationRun]:
|
|
result = await db.execute(
|
|
select(AIGenerationRun).order_by(AIGenerationRun.id.desc()).limit(limit)
|
|
)
|
|
return list(result.scalars().all())
|
|
|
|
|
|
async def _recent_generated_variants(
|
|
db: AsyncSession,
|
|
limit: int = 100,
|
|
basis_item_id: int | None = None,
|
|
) -> list[Item]:
|
|
stmt = select(Item).where(Item.generated_by == "ai")
|
|
if basis_item_id is not None:
|
|
stmt = stmt.where(Item.basis_item_id == basis_item_id)
|
|
result = await db.execute(
|
|
stmt.order_by(Item.created_at.desc(), Item.id.desc()).limit(limit)
|
|
)
|
|
return list(result.scalars().all())
|
|
|
|
|
|
async def _usage_metrics_for_items(
|
|
db: AsyncSession,
|
|
item_ids: list[int],
|
|
) -> dict[int, dict[str, float]]:
|
|
if not item_ids:
|
|
return {}
|
|
|
|
result = await db.execute(
|
|
select(
|
|
UserAnswer.item_id,
|
|
func.count(UserAnswer.id).label("impressions"),
|
|
func.count(func.distinct(UserAnswer.wp_user_id)).label("unique_users"),
|
|
)
|
|
.where(UserAnswer.item_id.in_(item_ids))
|
|
.group_by(UserAnswer.item_id)
|
|
)
|
|
|
|
metrics: dict[int, dict[str, float]] = {}
|
|
for item_id, impressions, unique_users in result.all():
|
|
impressions_i = int(impressions or 0)
|
|
unique_users_i = int(unique_users or 0)
|
|
frequency = (impressions_i / unique_users_i) if unique_users_i else 0.0
|
|
metrics[int(item_id)] = {
|
|
"impressions": float(impressions_i),
|
|
"unique_users": float(unique_users_i),
|
|
"frequency": float(frequency),
|
|
}
|
|
return metrics
|
|
|
|
|
|
async def _family_usage_stats(
|
|
db: AsyncSession,
|
|
basis_item: Item,
|
|
variants: list[Item],
|
|
) -> tuple[dict[int, dict[str, float]], dict[str, float]]:
|
|
family_item_ids = [basis_item.id] + [item.id for item in variants]
|
|
usage_metrics = await _usage_metrics_for_items(db, family_item_ids)
|
|
family_impressions = int(sum(metric["impressions"] for metric in usage_metrics.values()))
|
|
family_unique_users = int(
|
|
await db.scalar(
|
|
select(func.count(func.distinct(UserAnswer.wp_user_id))).where(
|
|
UserAnswer.item_id.in_(family_item_ids)
|
|
)
|
|
)
|
|
or 0
|
|
)
|
|
family_frequency = (family_impressions / family_unique_users) if family_unique_users else 0.0
|
|
return usage_metrics, {
|
|
"impressions": float(family_impressions),
|
|
"unique_users": float(family_unique_users),
|
|
"frequency": float(family_frequency),
|
|
}
|
|
|
|
|
|
def _basis_items_list_body(items: list[Item]) -> str:
|
|
rows = []
|
|
for item in items:
|
|
rows.append(
|
|
"<tr>"
|
|
f"<td>{item.id}</td>"
|
|
f"<td>{escape(item.tryout_id)}</td>"
|
|
f"<td>{item.slot}</td>"
|
|
f"<td>{item.website_id}</td>"
|
|
f"<td>{escape(_truncate(item.stem, 120))}</td>"
|
|
f"<td>{item.source_snapshot_question_id or '-'}</td>"
|
|
f"<td><a href=\"/admin/basis-items/{item.id}\" style=\"display:inline-block;padding:8px 12px;border-radius:8px;background:#0f172a;color:#fff;text-decoration:none;\">Open Workspace</a></td>"
|
|
"</tr>"
|
|
)
|
|
table = (
|
|
"<table><thead><tr><th>Item ID</th><th>Tryout</th><th>Slot</th><th>Website</th><th>Stem</th><th>Source Snapshot QID</th><th>Actions</th></tr></thead><tbody>"
|
|
+ ("".join(rows) if rows else "<tr><td colspan=\"7\">No basis items found.</td></tr>")
|
|
+ "</tbody></table>"
|
|
)
|
|
return f"""
|
|
<p class="muted">Basis items are canonical parent questions (<code>sedang</code>, non-AI). Open a workspace to generate and review AI child variants.</p>
|
|
{table}
|
|
"""
|
|
|
|
|
|
def _basis_item_workspace_body(
|
|
basis_item: Item,
|
|
runs: list[AIGenerationRun],
|
|
variants: list[Item],
|
|
usage_by_item: dict[int, dict[str, float]],
|
|
family_stats: dict[str, float],
|
|
filters: dict[str, str],
|
|
error: str | None = None,
|
|
success: str | None = None,
|
|
target_level: str = "mudah",
|
|
ai_model: str = settings.OPENROUTER_MODEL_QWEN,
|
|
generation_count: str = "1",
|
|
operator_notes: str = "",
|
|
include_note_for_admin: bool = True,
|
|
include_note_in_prompt: bool = False,
|
|
) -> str:
|
|
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
|
|
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
|
|
model_options = "".join(
|
|
f'<option value="{escape(model)}" {"selected" if model == ai_model else ""}>{escape(label)}</option>'
|
|
for model, label in SUPPORTED_MODELS.items()
|
|
)
|
|
status_filter = filters.get("status", "")
|
|
level_filter = filters.get("level", "")
|
|
min_frequency_filter = filters.get("min_frequency", "")
|
|
run_id_filter = filters.get("run_id", "")
|
|
|
|
run_rows = [
|
|
[
|
|
run.id,
|
|
run.target_level,
|
|
run.requested_count,
|
|
run.model,
|
|
run.created_by,
|
|
str(run.created_at),
|
|
]
|
|
for run in runs
|
|
]
|
|
runs_table = _table(
|
|
["Run ID", "Target", "Requested", "Model", "Created By", "Created At"],
|
|
run_rows,
|
|
)
|
|
|
|
variant_rows = []
|
|
for item in variants:
|
|
usage = usage_by_item.get(item.id, {"impressions": 0.0, "unique_users": 0.0, "frequency": 0.0})
|
|
variant_rows.append(
|
|
"<tr>"
|
|
f"<td><input type=\"checkbox\" name=\"item_ids\" value=\"{item.id}\"></td>"
|
|
f"<td>{item.id}</td>"
|
|
f"<td>{item.generation_run_id or '-'}</td>"
|
|
f"<td>{escape(item.level)}</td>"
|
|
f"<td>{escape(item.variant_status)}</td>"
|
|
f"<td>{escape(item.ai_model or '-')}</td>"
|
|
f"<td>{int(usage['impressions'])}</td>"
|
|
f"<td>{int(usage['unique_users'])}</td>"
|
|
f"<td>{usage['frequency']:.2f}</td>"
|
|
f"<td>{escape(_truncate(item.stem, 130))}</td>"
|
|
f"<td>{escape(str(item.created_at))}</td>"
|
|
"</tr>"
|
|
)
|
|
variants_table = (
|
|
f"<form method=\"post\" action=\"/admin/basis-items/{basis_item.id}/review-bulk\">"
|
|
"<div class=\"actions\" style=\"margin:16px 0\">"
|
|
"<select name=\"action\" style=\"max-width:260px\">"
|
|
"<option value=\"approved\">Approve selected</option>"
|
|
"<option value=\"rejected\">Reject selected</option>"
|
|
"<option value=\"archived\">Archive selected</option>"
|
|
"<option value=\"stale\">Mark stale</option>"
|
|
"<option value=\"active\">Activate selected</option>"
|
|
"</select>"
|
|
"<button type=\"submit\">Apply</button>"
|
|
"</div>"
|
|
"<table><thead><tr><th><input type=\"checkbox\" onclick=\"document.querySelectorAll('input[name="item_ids"]').forEach(el => el.checked = this.checked)\"></th><th>Item ID</th><th>Run ID</th><th>Level</th><th>Status</th><th>Model</th><th>Impressions</th><th>Unique Users</th><th>Frequency</th><th>Stem</th><th>Created At</th></tr></thead><tbody>"
|
|
+ ("".join(variant_rows) if variant_rows else "<tr><td colspan=\"11\">No generated variants yet for this parent.</td></tr>")
|
|
+ "</tbody></table></form>"
|
|
)
|
|
|
|
return f"""
|
|
{success_html}
|
|
{error_html}
|
|
<section style="border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#f8fafc;">
|
|
<h3 style="margin:0 0 10px;">Parent Summary</h3>
|
|
<p class="muted" style="margin:0 0 8px;">
|
|
Parent Item: <strong>#{basis_item.id}</strong> |
|
|
Tryout: <strong>{escape(basis_item.tryout_id)}</strong> |
|
|
Slot: <strong>{basis_item.slot}</strong> |
|
|
Website: <strong>{basis_item.website_id}</strong> |
|
|
Source Snapshot QID: <strong>{basis_item.source_snapshot_question_id or '-'}</strong>
|
|
</p>
|
|
<p class="muted" style="margin:0 0 8px;">
|
|
Family Usage: impressions=<strong>{int(family_stats.get("impressions", 0.0))}</strong>,
|
|
unique users=<strong>{int(family_stats.get("unique_users", 0.0))}</strong>,
|
|
frequency=<strong>{family_stats.get("frequency", 0.0):.2f}</strong>
|
|
</p>
|
|
<p class="muted" style="margin:0;"><strong>Stem:</strong> {escape(_truncate(_html_to_text(basis_item.stem), 260))}</p>
|
|
</section>
|
|
|
|
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
|
|
<h3 style="margin:0 0 8px;">Generate Variants</h3>
|
|
<p class="muted" style="margin:0 0 12px;">Create new AI child variants for this parent.</p>
|
|
<form method="post" action="/admin/basis-items/{basis_item.id}/generate" autocomplete="off">
|
|
<label for="target_level">Target Level</label>
|
|
<select id="target_level" name="target_level">
|
|
<option value="mudah" {"selected" if target_level == "mudah" else ""}>mudah</option>
|
|
<option value="sulit" {"selected" if target_level == "sulit" else ""}>sulit</option>
|
|
</select>
|
|
<label for="ai_model">Model</label>
|
|
<select id="ai_model" name="ai_model">{model_options}</select>
|
|
<label for="generation_count">Generate Count</label>
|
|
<input id="generation_count" name="generation_count" type="number" min="1" max="50" value="{escape(generation_count)}">
|
|
<p class="muted">Recommended: 1-3 per run. Larger runs increase overlap and review burden.</p>
|
|
<label for="operator_notes">Operator Notes (optional)</label>
|
|
<textarea id="operator_notes" name="operator_notes" rows="3">{escape(operator_notes)}</textarea>
|
|
<label class="row"><input type="checkbox" name="include_note_for_admin" {"checked" if include_note_for_admin else ""}> Save note for admin team (visible in run history)</label>
|
|
<label class="row"><input type="checkbox" name="include_note_in_prompt" {"checked" if include_note_in_prompt else ""}> Include note in AI prompt payload</label>
|
|
<p class="muted" style="margin-top:6px;">Example note: <code>Use clinical language, avoid negatives, keep stem under 40 words.</code></p>
|
|
<button type="submit">Generate Variants</button>
|
|
</form>
|
|
</section>
|
|
|
|
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
|
|
<h3 style="margin:0 0 8px;">Filter Variants</h3>
|
|
<p class="muted" style="margin:0 0 12px;">Filter child variants shown in the review table below.</p>
|
|
<form method="get" action="/admin/basis-items/{basis_item.id}" autocomplete="off">
|
|
<div style="display:grid;grid-template-columns:repeat(auto-fit,minmax(190px,1fr));gap:12px;align-items:end;">
|
|
<div>
|
|
<label for="status" style="margin:0 0 6px;">Status</label>
|
|
<select id="status" name="status">
|
|
<option value="" {"selected" if status_filter == "" else ""}>All</option>
|
|
<option value="draft" {"selected" if status_filter == "draft" else ""}>draft</option>
|
|
<option value="approved" {"selected" if status_filter == "approved" else ""}>approved</option>
|
|
<option value="active" {"selected" if status_filter == "active" else ""}>active</option>
|
|
<option value="rejected" {"selected" if status_filter == "rejected" else ""}>rejected</option>
|
|
<option value="archived" {"selected" if status_filter == "archived" else ""}>archived</option>
|
|
<option value="stale" {"selected" if status_filter == "stale" else ""}>stale</option>
|
|
</select>
|
|
</div>
|
|
<div>
|
|
<label for="level" style="margin:0 0 6px;">Level</label>
|
|
<select id="level" name="level">
|
|
<option value="" {"selected" if level_filter == "" else ""}>All</option>
|
|
<option value="mudah" {"selected" if level_filter == "mudah" else ""}>mudah</option>
|
|
<option value="sulit" {"selected" if level_filter == "sulit" else ""}>sulit</option>
|
|
</select>
|
|
</div>
|
|
<div>
|
|
<label for="run_id" style="margin:0 0 6px;">Run ID</label>
|
|
<input id="run_id" name="run_id" type="number" min="1" value="{escape(run_id_filter)}">
|
|
</div>
|
|
<div>
|
|
<label for="min_frequency" style="margin:0 0 6px;">Min Frequency</label>
|
|
<input id="min_frequency" name="min_frequency" type="number" min="0" step="0.1" value="{escape(min_frequency_filter)}">
|
|
</div>
|
|
<div style="display:flex;gap:10px;align-items:center;flex-wrap:wrap;">
|
|
<button type="submit">Apply</button>
|
|
<a href="/admin/basis-items/{basis_item.id}" style="display:inline-block;padding:12px 14px;border-radius:10px;background:#e2e8f0;color:#0f172a;text-decoration:none;font-size:15px;font-weight:600;">Reset</a>
|
|
</div>
|
|
</div>
|
|
</form>
|
|
</section>
|
|
|
|
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
|
|
<h3 style="margin:0 0 12px;">Generation Runs for This Parent</h3>
|
|
{runs_table}
|
|
</section>
|
|
|
|
<section style="margin-top:16px;border:1px solid #e2e8f0;border-radius:12px;padding:16px;background:#fff;">
|
|
<h3 style="margin:0 0 8px;">Child Variants for This Parent</h3>
|
|
<p class="muted">Filtered variants shown: <strong>{len(variants)}</strong></p>
|
|
{variants_table}
|
|
</section>
|
|
<p style="margin-top:20px"><a href="/admin/basis-items">Back to Basis Items</a></p>
|
|
"""
|
|
|
|
|
|
async def _find_or_create_demo_basis_item(db: AsyncSession) -> Item:
|
|
result = await db.execute(
|
|
select(Item)
|
|
.where(
|
|
Item.level == "sedang",
|
|
Item.generated_by == "manual",
|
|
Item.tryout_id == "demo-tryout",
|
|
)
|
|
.order_by(Item.id.asc())
|
|
.limit(1)
|
|
)
|
|
existing_item = result.scalar_one_or_none()
|
|
if existing_item:
|
|
return existing_item
|
|
|
|
website_result = await db.execute(
|
|
select(Website).where(Website.site_url == "https://demo.local").limit(1)
|
|
)
|
|
website = website_result.scalar_one_or_none()
|
|
if website is None:
|
|
website = Website(site_url="https://demo.local", site_name="Demo Website")
|
|
db.add(website)
|
|
await db.flush()
|
|
|
|
tryout_result = await db.execute(
|
|
select(Tryout)
|
|
.where(Tryout.website_id == website.id, Tryout.tryout_id == "demo-tryout")
|
|
.limit(1)
|
|
)
|
|
tryout = tryout_result.scalar_one_or_none()
|
|
if tryout is None:
|
|
tryout = Tryout(
|
|
website_id=website.id,
|
|
tryout_id="demo-tryout",
|
|
name="Demo AI Playground Tryout",
|
|
description="Seed data for the AI playground.",
|
|
scoring_mode="ctt",
|
|
selection_mode="fixed",
|
|
normalization_mode="static",
|
|
ai_generation_enabled=True,
|
|
)
|
|
db.add(tryout)
|
|
await db.flush()
|
|
|
|
item = Item(
|
|
tryout_id=tryout.tryout_id,
|
|
website_id=website.id,
|
|
slot=1,
|
|
level="sedang",
|
|
stem="Sebuah toko memberi diskon 20% untuk sebuah tas. Jika harga setelah diskon adalah Rp240.000, berapakah harga tas sebelum diskon?",
|
|
options={
|
|
"A": "Rp260.000",
|
|
"B": "Rp300.000",
|
|
"C": "Rp320.000",
|
|
"D": "Rp360.000",
|
|
},
|
|
correct_answer="B",
|
|
explanation="Harga setelah diskon 20% berarti 80% dari harga awal. Jadi harga awal = 240.000 / 0,8 = 300.000.",
|
|
generated_by="manual",
|
|
calibrated=False,
|
|
calibration_sample_size=0,
|
|
)
|
|
db.add(item)
|
|
await db.flush()
|
|
await db.commit()
|
|
await db.refresh(item)
|
|
return item
|
|
|
|
|
|
async def _load_websites(db: AsyncSession) -> list[Website]:
|
|
result = await db.execute(select(Website).order_by(Website.id.asc()))
|
|
return list(result.scalars().all())
|
|
|
|
|
|
async def _recent_snapshots(db: AsyncSession, limit: int = 20) -> list[TryoutImportSnapshot]:
|
|
result = await db.execute(
|
|
select(TryoutImportSnapshot).order_by(TryoutImportSnapshot.id.desc()).limit(limit)
|
|
)
|
|
return list(result.scalars().all())
|
|
|
|
|
|
async def _ensure_operational_tryout(snapshot: TryoutImportSnapshot, db: AsyncSession) -> Tryout:
|
|
result = await db.execute(
|
|
select(Tryout).where(
|
|
Tryout.website_id == snapshot.website_id,
|
|
Tryout.tryout_id == snapshot.source_tryout_id,
|
|
)
|
|
)
|
|
tryout = result.scalar_one_or_none()
|
|
if tryout:
|
|
return tryout
|
|
|
|
tryout = Tryout(
|
|
website_id=snapshot.website_id,
|
|
tryout_id=snapshot.source_tryout_id,
|
|
name=snapshot.title,
|
|
description=f"Operational tryout basis created from imported snapshot #{snapshot.id}.",
|
|
scoring_mode="ctt",
|
|
selection_mode="fixed",
|
|
normalization_mode="static",
|
|
ai_generation_enabled=True,
|
|
)
|
|
db.add(tryout)
|
|
await db.flush()
|
|
return tryout
|
|
|
|
|
|
async def _load_snapshot_question_context(
|
|
snapshot: TryoutImportSnapshot,
|
|
db: AsyncSession,
|
|
) -> tuple[list[TryoutSnapshotQuestion], dict[int, Item], dict[str, int]]:
|
|
question_result = await db.execute(
|
|
select(TryoutSnapshotQuestion)
|
|
.where(
|
|
TryoutSnapshotQuestion.website_id == snapshot.website_id,
|
|
TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id,
|
|
)
|
|
.order_by(TryoutSnapshotQuestion.source_question_id.asc())
|
|
)
|
|
questions = list(question_result.scalars().all())
|
|
item_result = await db.execute(
|
|
select(Item).where(
|
|
Item.website_id == snapshot.website_id,
|
|
Item.tryout_id == snapshot.source_tryout_id,
|
|
Item.level == "sedang",
|
|
)
|
|
)
|
|
promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()}
|
|
slot_map = _snapshot_slot_map(snapshot)
|
|
questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id))
|
|
return questions, promoted_items_by_slot, slot_map
|
|
|
|
|
|
async def _promote_snapshot_question_to_item(
|
|
snapshot: TryoutImportSnapshot,
|
|
question: TryoutSnapshotQuestion,
|
|
db: AsyncSession,
|
|
) -> tuple[Item | None, str]:
|
|
if (
|
|
question.website_id != snapshot.website_id
|
|
or question.source_tryout_id != snapshot.source_tryout_id
|
|
):
|
|
return None, "mismatch"
|
|
|
|
slot_map = _snapshot_slot_map(snapshot)
|
|
slot = slot_map.get(question.source_question_id)
|
|
if not slot:
|
|
max_slot = (
|
|
await db.scalar(
|
|
select(func.max(Item.slot)).where(
|
|
Item.website_id == snapshot.website_id,
|
|
Item.tryout_id == snapshot.source_tryout_id,
|
|
Item.level == "sedang",
|
|
)
|
|
)
|
|
or 0
|
|
)
|
|
slot = max_slot + 1
|
|
|
|
options = _snapshot_options_to_item_options(question.raw_options)
|
|
if not options:
|
|
return None, "missing_options"
|
|
|
|
await _ensure_operational_tryout(snapshot, db)
|
|
|
|
existing_item_result = await db.execute(
|
|
select(Item).where(
|
|
Item.website_id == snapshot.website_id,
|
|
Item.tryout_id == snapshot.source_tryout_id,
|
|
Item.slot == slot,
|
|
Item.level == "sedang",
|
|
)
|
|
)
|
|
existing_item = existing_item_result.scalar_one_or_none()
|
|
if existing_item is not None:
|
|
return existing_item, "existing"
|
|
|
|
item = Item(
|
|
tryout_id=snapshot.source_tryout_id,
|
|
website_id=snapshot.website_id,
|
|
slot=slot,
|
|
level="sedang",
|
|
stem=question.question_html,
|
|
options=options,
|
|
correct_answer=question.correct_answer,
|
|
explanation=question.explanation_html,
|
|
generated_by="manual",
|
|
source_snapshot_question_id=question.id,
|
|
variant_status="active",
|
|
calibrated=False,
|
|
calibration_sample_size=0,
|
|
)
|
|
db.add(item)
|
|
await db.flush()
|
|
return item, "created"
|
|
|
|
|
|
@router.get("", include_in_schema=False)
|
|
@router.get("/", include_in_schema=False)
|
|
async def admin_root(request: Request):
|
|
admin = await _current_admin(request)
|
|
if admin:
|
|
return _dashboard_redirect()
|
|
return _login_redirect()
|
|
|
|
|
|
@router.get("/login", include_in_schema=False)
|
|
async def login_view(request: Request):
|
|
admin = await _current_admin(request)
|
|
if admin:
|
|
return _dashboard_redirect()
|
|
|
|
body = """
|
|
<form method="post" action="/admin/login" autocomplete="off">
|
|
<label for="username">Username</label>
|
|
<input id="username" name="username" type="text" autocomplete="username">
|
|
<label for="password">Password</label>
|
|
<input id="password" name="password" type="password" autocomplete="current-password">
|
|
<label class="row"><input type="checkbox" name="remember_me" __REMEMBER_ME_CHECKED__> Remember me</label>
|
|
<button type="submit">Sign in</button>
|
|
</form>
|
|
<p class="muted">Direct environment-backed admin access.</p>
|
|
"""
|
|
return _render_auth_page(
|
|
request,
|
|
"Admin Login",
|
|
"Use the configured admin credentials to access the dashboard.",
|
|
body,
|
|
)
|
|
|
|
|
|
@router.post("/login", include_in_schema=False)
|
|
async def login_submit(
|
|
request: Request,
|
|
username: str = Form(...),
|
|
password: str = Form(...),
|
|
remember_me: str | None = Form(None),
|
|
):
|
|
if not (
|
|
secrets.compare_digest(username, settings.ADMIN_USERNAME)
|
|
and secrets.compare_digest(password, settings.ADMIN_PASSWORD)
|
|
):
|
|
body = f"""
|
|
<div class="error">Invalid username or password.</div>
|
|
<form method="post" action="/admin/login" autocomplete="off">
|
|
<label for="username">Username</label>
|
|
<input id="username" name="username" type="text" autocomplete="username" value="{escape(username)}">
|
|
<label for="password">Password</label>
|
|
<input id="password" name="password" type="password" autocomplete="current-password">
|
|
<label class="row"><input type="checkbox" name="remember_me" __REMEMBER_ME_CHECKED__> Remember me</label>
|
|
<button type="submit">Sign in</button>
|
|
</form>
|
|
"""
|
|
return _render_auth_page(
|
|
request,
|
|
"Admin Login",
|
|
"Use the configured admin credentials to access the dashboard.",
|
|
body,
|
|
status_code=HTTP_401_UNAUTHORIZED,
|
|
)
|
|
|
|
expire = settings.ADMIN_SESSION_EXPIRE_SECONDS
|
|
response = _dashboard_redirect()
|
|
if remember_me == "on":
|
|
expire = max(expire, 3600 * 24 * 30)
|
|
response.set_cookie("remember_me", "on", expires=expire, path="/admin")
|
|
else:
|
|
response.delete_cookie("remember_me", path="/admin")
|
|
|
|
token = uuid.uuid4().hex
|
|
response.set_cookie(
|
|
SESSION_COOKIE,
|
|
token,
|
|
expires=expire,
|
|
path="/admin",
|
|
httponly=True,
|
|
samesite="lax",
|
|
)
|
|
await _admin_redis.set(f"{SESSION_PREFIX}{token}", settings.ADMIN_USERNAME, ex=expire)
|
|
return response
|
|
|
|
|
|
@router.get("/logout", include_in_schema=False)
|
|
async def logout(request: Request):
|
|
token = request.cookies.get(SESSION_COOKIE)
|
|
if token and _admin_redis is not None:
|
|
await _admin_redis.delete(f"{SESSION_PREFIX}{token}")
|
|
|
|
response = _login_redirect()
|
|
response.delete_cookie(SESSION_COOKIE, path="/admin")
|
|
response.delete_cookie("remember_me", path="/admin")
|
|
return response
|
|
|
|
|
|
@router.get("/password", include_in_schema=False)
|
|
async def password_view(request: Request):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
body = f"""
|
|
<p class="muted">Signed in as <strong>{escape(admin.username)}</strong>.</p>
|
|
<p>Password changes are disabled in the UI for this deployment.</p>
|
|
<p>Update <code>ADMIN_PASSWORD</code> in the server environment, then restart the app.</p>
|
|
<p>Session expiry is currently set to <strong>{settings.ADMIN_SESSION_EXPIRE_SECONDS}</strong> seconds.</p>
|
|
<p><a href="/admin/dashboard">Back to dashboard</a></p>
|
|
"""
|
|
return _render_auth_page(
|
|
request,
|
|
"Password Management",
|
|
"Runtime password rotation is intentionally disabled.",
|
|
body,
|
|
)
|
|
|
|
|
|
@router.post("/password", include_in_schema=False)
|
|
async def password_submit(
|
|
request: Request,
|
|
old_password: str = Form(...),
|
|
new_password: str = Form(...),
|
|
re_new_password: str = Form(...),
|
|
):
|
|
_ = (old_password, new_password, re_new_password)
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
body = """
|
|
<div class="error">Password rotation via UI is disabled.</div>
|
|
<p>Update <code>ADMIN_PASSWORD</code> in the server environment, then restart the app.</p>
|
|
<p><a href="/admin/dashboard">Back to dashboard</a></p>
|
|
"""
|
|
return _render_auth_page(
|
|
request,
|
|
"Password Management",
|
|
"Runtime password rotation is intentionally disabled.",
|
|
body,
|
|
status_code=400,
|
|
)
|
|
|
|
|
|
@router.get("/dashboard", include_in_schema=False)
|
|
async def dashboard_view(request: Request, db: AsyncSession = Depends(get_db)):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
tryouts = await db.scalar(select(func.count()).select_from(Tryout)) or 0
|
|
items = await db.scalar(select(func.count()).select_from(Item)) or 0
|
|
sessions = await db.scalar(select(func.count()).select_from(Session)) or 0
|
|
completed_sessions = (
|
|
await db.scalar(select(func.count()).select_from(Session).where(Session.is_completed.is_(True)))
|
|
or 0
|
|
)
|
|
|
|
body = f"""
|
|
<p class="muted">Signed in as <strong>{escape(admin.username)}</strong>.</p>
|
|
<div class="grid">
|
|
<div class="stat">Tryouts<strong>{tryouts}</strong></div>
|
|
<div class="stat">Items<strong>{items}</strong></div>
|
|
<div class="stat">Sessions<strong>{sessions}</strong></div>
|
|
<div class="stat">Completed Sessions<strong>{completed_sessions}</strong></div>
|
|
</div>
|
|
<p style="margin-top:20px"><a href="/admin/ai-playground">Open AI Playground</a></p>
|
|
"""
|
|
return _render_admin_page("IRT Bank Soal Admin", "Dashboard", body)
|
|
|
|
|
|
@router.get("/websites", include_in_schema=False)
|
|
async def websites_view(request: Request, db: AsyncSession = Depends(get_db)):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
result = await db.execute(select(Website).order_by(Website.id.asc()))
|
|
websites = list(result.scalars().all())
|
|
body = _websites_form_body(websites)
|
|
return _render_admin_page("Websites", "Websites", body)
|
|
|
|
|
|
@router.post("/websites", include_in_schema=False)
|
|
async def websites_submit(
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
site_name: str = Form(...),
|
|
site_url: str = Form(...),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
normalized_name = site_name.strip()
|
|
normalized_url = site_url.strip().rstrip("/")
|
|
|
|
if not normalized_name:
|
|
result = await db.execute(select(Website).order_by(Website.id.asc()))
|
|
websites = list(result.scalars().all())
|
|
body = _websites_form_body(
|
|
websites,
|
|
error="Website name is required.",
|
|
site_name=site_name,
|
|
site_url=site_url,
|
|
)
|
|
return _render_admin_page("Websites", "Websites", body)
|
|
|
|
if not normalized_url.startswith(("http://", "https://")):
|
|
result = await db.execute(select(Website).order_by(Website.id.asc()))
|
|
websites = list(result.scalars().all())
|
|
body = _websites_form_body(
|
|
websites,
|
|
error="Website URL must start with http:// or https://.",
|
|
site_name=site_name,
|
|
site_url=site_url,
|
|
)
|
|
return _render_admin_page("Websites", "Websites", body)
|
|
|
|
website = Website(site_name=normalized_name, site_url=normalized_url)
|
|
db.add(website)
|
|
try:
|
|
await db.commit()
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
result = await db.execute(select(Website).order_by(Website.id.asc()))
|
|
websites = list(result.scalars().all())
|
|
body = _websites_form_body(
|
|
websites,
|
|
error="Website URL already exists.",
|
|
site_name=site_name,
|
|
site_url=site_url,
|
|
)
|
|
return _render_admin_page("Websites", "Websites", body)
|
|
|
|
result = await db.execute(select(Website).order_by(Website.id.asc()))
|
|
websites = list(result.scalars().all())
|
|
body = _websites_form_body(
|
|
websites,
|
|
success=f"Website added successfully with ID {website.id}.",
|
|
)
|
|
return _render_admin_page("Websites", "Websites", body)
|
|
|
|
|
|
@router.get("/websites/{website_id}/edit", include_in_schema=False)
|
|
async def website_edit_view(
|
|
website_id: int,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
website = await db.get(Website, website_id)
|
|
if website is None:
|
|
result = await db.execute(select(Website).order_by(Website.id.asc()))
|
|
websites = list(result.scalars().all())
|
|
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
|
|
return _render_admin_page("Websites", "Websites", body)
|
|
|
|
body = _website_edit_form_body(website)
|
|
return _render_admin_page("Edit Website", "Edit Website", body)
|
|
|
|
|
|
@router.post("/websites/{website_id}/edit", include_in_schema=False)
|
|
async def website_edit_submit(
|
|
website_id: int,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
site_name: str = Form(...),
|
|
site_url: str = Form(...),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
website = await db.get(Website, website_id)
|
|
if website is None:
|
|
result = await db.execute(select(Website).order_by(Website.id.asc()))
|
|
websites = list(result.scalars().all())
|
|
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
|
|
return _render_admin_page("Websites", "Websites", body)
|
|
|
|
normalized_name = site_name.strip()
|
|
normalized_url = site_url.strip().rstrip("/")
|
|
|
|
if not normalized_name:
|
|
body = _website_edit_form_body(
|
|
website,
|
|
error="Website name is required.",
|
|
site_name=site_name,
|
|
site_url=site_url,
|
|
)
|
|
return _render_admin_page("Edit Website", "Edit Website", body)
|
|
|
|
if not normalized_url.startswith(("http://", "https://")):
|
|
body = _website_edit_form_body(
|
|
website,
|
|
error="Website URL must start with http:// or https://.",
|
|
site_name=site_name,
|
|
site_url=site_url,
|
|
)
|
|
return _render_admin_page("Edit Website", "Edit Website", body)
|
|
|
|
website.site_name = normalized_name
|
|
website.site_url = normalized_url
|
|
try:
|
|
await db.commit()
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
body = _website_edit_form_body(
|
|
website,
|
|
error="Website URL already exists.",
|
|
site_name=site_name,
|
|
site_url=site_url,
|
|
)
|
|
return _render_admin_page("Edit Website", "Edit Website", body)
|
|
|
|
await db.refresh(website)
|
|
body = _website_edit_form_body(
|
|
website,
|
|
success=f"Website #{website.id} updated successfully.",
|
|
)
|
|
return _render_admin_page("Edit Website", "Edit Website", body)
|
|
|
|
|
|
@router.post("/websites/{website_id}/delete", include_in_schema=False)
|
|
async def website_delete_submit(
|
|
website_id: int,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
website = await db.get(Website, website_id)
|
|
if website is None:
|
|
result = await db.execute(select(Website).order_by(Website.id.asc()))
|
|
websites = list(result.scalars().all())
|
|
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
|
|
return _render_admin_page("Websites", "Websites", body)
|
|
|
|
deleted_label = f"{website.site_name} ({website.site_url})"
|
|
await db.delete(website)
|
|
await db.commit()
|
|
|
|
result = await db.execute(select(Website).order_by(Website.id.asc()))
|
|
websites = list(result.scalars().all())
|
|
body = _websites_form_body(
|
|
websites,
|
|
success=f"Website deleted successfully: {deleted_label}",
|
|
)
|
|
return _render_admin_page("Websites", "Websites", body)
|
|
|
|
|
|
@router.get("/tryout-import", include_in_schema=False)
|
|
async def tryout_import_view(request: Request, db: AsyncSession = Depends(get_db)):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
websites = await _load_websites(db)
|
|
snapshots = await _recent_snapshots(db)
|
|
body = _tryout_import_form_body(websites, snapshots)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
|
|
|
|
@router.post("/tryout-import/preview", include_in_schema=False)
|
|
async def tryout_import_preview(
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
website_id: int = Form(...),
|
|
file: UploadFile = File(...),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
websites = await _load_websites(db)
|
|
snapshots = await _recent_snapshots(db)
|
|
|
|
if not file.filename or not file.filename.lower().endswith(".json"):
|
|
body = _tryout_import_form_body(
|
|
websites,
|
|
snapshots,
|
|
error="File must be .json format.",
|
|
selected_website_id=website_id,
|
|
)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
|
|
try:
|
|
payload_bytes = await file.read()
|
|
payload_text = payload_bytes.decode("utf-8")
|
|
payload = json.loads(payload_text)
|
|
except UnicodeDecodeError:
|
|
body = _tryout_import_form_body(
|
|
websites,
|
|
snapshots,
|
|
error="File must be UTF-8 encoded JSON.",
|
|
selected_website_id=website_id,
|
|
)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
except json.JSONDecodeError as exc:
|
|
body = _tryout_import_form_body(
|
|
websites,
|
|
snapshots,
|
|
error=f"Invalid JSON file: {exc}",
|
|
selected_website_id=website_id,
|
|
)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
|
|
try:
|
|
preview = await preview_tryout_json_import(payload, website_id, db)
|
|
except TryoutImportError as exc:
|
|
body = _tryout_import_form_body(
|
|
websites,
|
|
snapshots,
|
|
error=str(exc),
|
|
selected_website_id=website_id,
|
|
)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
|
|
preview_token = uuid.uuid4().hex
|
|
await _admin_redis.set(
|
|
f"{IMPORT_PREVIEW_PREFIX}{preview_token}",
|
|
payload_text,
|
|
ex=IMPORT_PREVIEW_TTL_SECONDS,
|
|
)
|
|
body = _tryout_import_form_body(
|
|
websites,
|
|
snapshots,
|
|
selected_website_id=website_id,
|
|
preview=preview,
|
|
preview_token=preview_token,
|
|
upload_filename=file.filename or "",
|
|
)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
|
|
|
|
@router.post("/tryout-import", include_in_schema=False)
|
|
async def tryout_import_submit(
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
website_id: int = Form(...),
|
|
preview_token: str = Form(...),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
websites = await _load_websites(db)
|
|
snapshots = await _recent_snapshots(db)
|
|
|
|
payload_text = await _admin_redis.get(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
|
|
if not payload_text:
|
|
body = _tryout_import_form_body(
|
|
websites,
|
|
snapshots,
|
|
error="Preview token expired. Upload the JSON again and preview before importing.",
|
|
selected_website_id=website_id,
|
|
)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
|
|
try:
|
|
payload = json.loads(payload_text)
|
|
result = await import_tryout_json_snapshot(payload, website_id, db)
|
|
await db.commit()
|
|
except TryoutImportError as exc:
|
|
await db.rollback()
|
|
body = _tryout_import_form_body(
|
|
websites,
|
|
snapshots,
|
|
error=str(exc),
|
|
selected_website_id=website_id,
|
|
)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
except Exception:
|
|
await db.rollback()
|
|
raise
|
|
finally:
|
|
await _admin_redis.delete(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
|
|
|
|
updated_snapshots = await _recent_snapshots(db)
|
|
imported_tryouts = result.get("imported_tryouts") or []
|
|
imported_count = sum((row.get("question_count") or 0) for row in imported_tryouts)
|
|
body = _tryout_import_form_body(
|
|
websites,
|
|
updated_snapshots,
|
|
success=(
|
|
f"Imported {len(imported_tryouts)} tryout snapshot(s) and archived {imported_count} source question reference row(s)."
|
|
),
|
|
selected_website_id=website_id,
|
|
)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
|
|
|
|
@router.get("/snapshot-questions", include_in_schema=False)
|
|
async def snapshot_questions_view(
|
|
request: Request,
|
|
snapshot_id: int,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
snapshot = await db.get(TryoutImportSnapshot, snapshot_id)
|
|
if snapshot is None:
|
|
websites = await _load_websites(db)
|
|
snapshots = await _recent_snapshots(db)
|
|
body = _tryout_import_form_body(
|
|
websites,
|
|
snapshots,
|
|
error=f"Snapshot not found: {snapshot_id}",
|
|
)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
|
|
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
|
|
body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot)
|
|
return _render_admin_page("Snapshot Questions", "Snapshot Questions", body)
|
|
|
|
|
|
@router.post("/snapshot-questions/promote-bulk", include_in_schema=False)
|
|
async def snapshot_question_promote_bulk(
|
|
request: Request,
|
|
snapshot_id: int = Form(...),
|
|
snapshot_question_ids: list[int] | None = Form(None),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
snapshot = await db.get(TryoutImportSnapshot, snapshot_id)
|
|
if snapshot is None:
|
|
websites = await _load_websites(db)
|
|
snapshots = await _recent_snapshots(db)
|
|
body = _tryout_import_form_body(
|
|
websites,
|
|
snapshots,
|
|
error=f"Snapshot not found: {snapshot_id}",
|
|
)
|
|
return _render_admin_page("Tryout Import", "Tryout Import", body)
|
|
|
|
if not snapshot_question_ids:
|
|
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
|
|
body = _snapshot_questions_body(
|
|
snapshot,
|
|
questions,
|
|
promoted_items_by_slot,
|
|
error="Select at least one snapshot question to promote.",
|
|
)
|
|
return _render_admin_page("Snapshot Questions", "Snapshot Questions", body)
|
|
|
|
question_result = await db.execute(
|
|
select(TryoutSnapshotQuestion).where(
|
|
TryoutSnapshotQuestion.id.in_(snapshot_question_ids)
|
|
)
|
|
)
|
|
selected_questions = list(question_result.scalars().all())
|
|
|
|
created_items: list[Item] = []
|
|
existing_items: list[Item] = []
|
|
missing_option_count = 0
|
|
mismatch_count = 0
|
|
|
|
for question in selected_questions:
|
|
item, status = await _promote_snapshot_question_to_item(snapshot, question, db)
|
|
if status == "created" and item is not None:
|
|
created_items.append(item)
|
|
elif status == "existing" and item is not None:
|
|
existing_items.append(item)
|
|
elif status == "missing_options":
|
|
missing_option_count += 1
|
|
elif status == "mismatch":
|
|
mismatch_count += 1
|
|
|
|
await db.commit()
|
|
|
|
questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db)
|
|
success_parts = []
|
|
if created_items:
|
|
success_parts.append(f"created {len(created_items)} item(s)")
|
|
if existing_items:
|
|
success_parts.append(f"reused {len(existing_items)} existing item(s)")
|
|
if missing_option_count:
|
|
success_parts.append(f"skipped {missing_option_count} question(s) with missing option text")
|
|
if mismatch_count:
|
|
success_parts.append(f"skipped {mismatch_count} mismatched question(s)")
|
|
success_message = "Bulk promote finished: " + ", ".join(success_parts) + "."
|
|
if created_items:
|
|
success_message += f" Latest basis item ID: {created_items[-1].id}."
|
|
|
|
body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot, success=success_message)
|
|
return _render_admin_page("Snapshot Questions", "Snapshot Questions", body)
|
|
|
|
|
|
@router.get("/calibration-status", include_in_schema=False)
|
|
async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
result = await db.execute(select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id))
|
|
tryouts = result.all()
|
|
|
|
rows = []
|
|
for tryout_id, name, website_id in tryouts:
|
|
status = await get_calibration_status(tryout_id, website_id, db)
|
|
rows.append(
|
|
[
|
|
tryout_id,
|
|
name,
|
|
status["total_items"],
|
|
status["calibrated_items"],
|
|
f'{status["calibration_percentage"]:.2f}%',
|
|
"Yes" if status["ready_for_irt"] else "No",
|
|
]
|
|
)
|
|
|
|
body = _table(
|
|
["Tryout ID", "Name", "Total Items", "Calibrated", "Calibration %", "Ready for IRT"],
|
|
rows,
|
|
)
|
|
return _render_admin_page("Calibration Status", "Calibration Status", body)
|
|
|
|
|
|
@router.get("/item-statistics", include_in_schema=False)
|
|
async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
result = await db.execute(select(Item.level).distinct())
|
|
levels = result.scalars().all()
|
|
|
|
rows = []
|
|
for level in levels:
|
|
item_result = await db.execute(select(Item).where(Item.level == level).order_by(Item.slot).limit(10))
|
|
items = item_result.scalars().all()
|
|
total_responses = sum(item.calibration_sample_size or 0 for item in items)
|
|
calibrated_count = sum(1 for item in items if item.calibrated)
|
|
calibration_percentage = (calibrated_count / len(items) * 100) if items else 0
|
|
avg_correctness = sum(item.ctt_p or 0 for item in items) / len(items) if items else 0
|
|
rows.append(
|
|
[
|
|
level,
|
|
len(items),
|
|
calibrated_count,
|
|
f"{calibration_percentage:.2f}%",
|
|
total_responses,
|
|
f"{avg_correctness:.4f}",
|
|
]
|
|
)
|
|
|
|
body = _table(
|
|
["Level", "Total Items", "Calibrated", "Calibration %", "Responses", "Avg Correctness"],
|
|
rows,
|
|
)
|
|
return _render_admin_page("Item Statistics", "Item Statistics", body)
|
|
|
|
|
|
@router.get("/session-overview", include_in_schema=False)
|
|
async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50))
|
|
sessions = result.scalars().all()
|
|
|
|
rows = [
|
|
[
|
|
session.session_id,
|
|
session.wp_user_id,
|
|
session.tryout_id,
|
|
"Yes" if session.is_completed else "No",
|
|
session.scoring_mode_used,
|
|
session.total_benar,
|
|
session.NM,
|
|
session.NN,
|
|
session.theta,
|
|
]
|
|
for session in sessions
|
|
]
|
|
body = _table(
|
|
["Session ID", "WP User", "Tryout", "Completed", "Mode", "Benar", "NM", "NN", "Theta"],
|
|
rows,
|
|
)
|
|
return _render_admin_page("Session Overview", "Session Overview", body)
|
|
|
|
|
|
@router.get("/basis-items", include_in_schema=False)
|
|
async def basis_items_view(request: Request, db: AsyncSession = Depends(get_db)):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
result = await db.execute(
|
|
select(Item)
|
|
.where(Item.level == "sedang", Item.generated_by != "ai")
|
|
.order_by(Item.updated_at.desc(), Item.id.desc())
|
|
.limit(200)
|
|
)
|
|
basis_items = list(result.scalars().all())
|
|
body = _basis_items_list_body(basis_items)
|
|
return _render_admin_page("Basis Items", "Basis Items", body)
|
|
|
|
|
|
@router.get("/basis-items/{basis_item_id}", include_in_schema=False)
|
|
async def basis_item_workspace_view(
|
|
basis_item_id: int,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
status_filter = (request.query_params.get("status") or "").strip()
|
|
level_filter = (request.query_params.get("level") or "").strip()
|
|
run_id_filter = (request.query_params.get("run_id") or "").strip()
|
|
min_frequency_filter = (request.query_params.get("min_frequency") or "").strip()
|
|
filters = {
|
|
"status": status_filter,
|
|
"level": level_filter,
|
|
"run_id": run_id_filter,
|
|
"min_frequency": min_frequency_filter,
|
|
}
|
|
|
|
basis_item = await db.get(Item, basis_item_id)
|
|
if basis_item is None or basis_item.generated_by == "ai" or basis_item.level != "sedang":
|
|
result = await db.execute(
|
|
select(Item)
|
|
.where(Item.level == "sedang", Item.generated_by != "ai")
|
|
.order_by(Item.updated_at.desc(), Item.id.desc())
|
|
.limit(200)
|
|
)
|
|
body = _basis_items_list_body(list(result.scalars().all()))
|
|
return _render_admin_page("Basis Items", "Basis Items", body)
|
|
|
|
run_result = await db.execute(
|
|
select(AIGenerationRun)
|
|
.where(AIGenerationRun.basis_item_id == basis_item.id)
|
|
.order_by(AIGenerationRun.id.desc())
|
|
.limit(50)
|
|
)
|
|
runs = list(run_result.scalars().all())
|
|
variant_result = await db.execute(
|
|
select(Item).where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
|
|
.order_by(Item.created_at.desc(), Item.id.desc())
|
|
.limit(300)
|
|
)
|
|
variants_all = list(variant_result.scalars().all())
|
|
variants = variants_all
|
|
if status_filter:
|
|
variants = [item for item in variants if item.variant_status == status_filter]
|
|
if level_filter in {"mudah", "sulit"}:
|
|
variants = [item for item in variants if item.level == level_filter]
|
|
if run_id_filter.isdigit():
|
|
rid = int(run_id_filter)
|
|
variants = [item for item in variants if item.generation_run_id == rid]
|
|
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
|
|
if min_frequency_filter:
|
|
try:
|
|
min_freq = float(min_frequency_filter)
|
|
variants = [
|
|
item
|
|
for item in variants
|
|
if usage_metrics.get(item.id, {}).get("frequency", 0.0) >= min_freq
|
|
]
|
|
except ValueError:
|
|
pass
|
|
body = _basis_item_workspace_body(
|
|
basis_item,
|
|
runs,
|
|
variants,
|
|
usage_metrics,
|
|
family_stats,
|
|
filters,
|
|
)
|
|
return _render_admin_page(
|
|
f"Basis Item #{basis_item.id}",
|
|
f"Basis Item Workspace #{basis_item.id}",
|
|
body,
|
|
)
|
|
|
|
|
|
@router.post("/basis-items/{basis_item_id}/generate", include_in_schema=False)
|
|
async def basis_item_generate_submit(
|
|
basis_item_id: int,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
target_level: str = Form(...),
|
|
ai_model: str = Form(...),
|
|
generation_count: int = Form(1),
|
|
operator_notes: str = Form(""),
|
|
include_note_for_admin: str | None = Form(None),
|
|
include_note_in_prompt: str | None = Form(None),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
filters = {"status": "", "level": "", "run_id": "", "min_frequency": ""}
|
|
basis_item = await db.get(Item, basis_item_id)
|
|
if basis_item is None or basis_item.generated_by == "ai" or basis_item.level != "sedang":
|
|
return RedirectResponse(url="/admin/basis-items", status_code=HTTP_303_SEE_OTHER)
|
|
|
|
note_for_admin = include_note_for_admin == "on"
|
|
note_in_prompt = include_note_in_prompt == "on"
|
|
|
|
if not settings.OPENROUTER_API_KEY:
|
|
run_result = await db.execute(
|
|
select(AIGenerationRun)
|
|
.where(AIGenerationRun.basis_item_id == basis_item.id)
|
|
.order_by(AIGenerationRun.id.desc())
|
|
.limit(50)
|
|
)
|
|
variant_result = await db.execute(
|
|
select(Item)
|
|
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
|
|
.order_by(Item.created_at.desc(), Item.id.desc())
|
|
.limit(300)
|
|
)
|
|
runs = list(run_result.scalars().all())
|
|
variants = list(variant_result.scalars().all())
|
|
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
|
|
body = _basis_item_workspace_body(
|
|
basis_item,
|
|
runs,
|
|
variants,
|
|
usage_metrics,
|
|
family_stats,
|
|
filters,
|
|
error="OPENROUTER_API_KEY is not configured.",
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
generation_count=str(generation_count),
|
|
operator_notes=operator_notes,
|
|
include_note_for_admin=note_for_admin,
|
|
include_note_in_prompt=note_in_prompt,
|
|
)
|
|
return _render_admin_page(
|
|
f"Basis Item #{basis_item.id}",
|
|
f"Basis Item Workspace #{basis_item.id}",
|
|
body,
|
|
)
|
|
|
|
if target_level not in {"mudah", "sulit"}:
|
|
return RedirectResponse(url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER)
|
|
if not validate_ai_model(ai_model):
|
|
return RedirectResponse(url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER)
|
|
if generation_count < 1 or generation_count > 50:
|
|
return RedirectResponse(url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER)
|
|
|
|
run_id = await create_generation_run(
|
|
basis_item_id=basis_item.id,
|
|
source_snapshot_question_id=basis_item.source_snapshot_question_id,
|
|
target_level=target_level,
|
|
requested_count=generation_count,
|
|
model=ai_model,
|
|
created_by=admin.username,
|
|
operator_notes=(operator_notes.strip() or None) if note_for_admin else None,
|
|
db=db,
|
|
)
|
|
generated = await generate_questions_batch(
|
|
basis_item=basis_item,
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
count=generation_count,
|
|
operator_notes=operator_notes if note_in_prompt else None,
|
|
)
|
|
|
|
from app.schemas.ai import GeneratedQuestion
|
|
|
|
saved = 0
|
|
for generated_question in generated:
|
|
item_id = await save_ai_question(
|
|
generated_data=GeneratedQuestion(
|
|
stem=generated_question.stem,
|
|
options=generated_question.options,
|
|
correct=generated_question.correct,
|
|
explanation=generated_question.explanation or None,
|
|
),
|
|
tryout_id=basis_item.tryout_id,
|
|
website_id=basis_item.website_id,
|
|
basis_item_id=basis_item.id,
|
|
slot=basis_item.slot,
|
|
level=target_level,
|
|
ai_model=ai_model,
|
|
generation_run_id=run_id,
|
|
source_snapshot_question_id=basis_item.source_snapshot_question_id,
|
|
variant_status="draft",
|
|
db=db,
|
|
)
|
|
if item_id:
|
|
saved += 1
|
|
|
|
await db.commit()
|
|
|
|
run_result = await db.execute(
|
|
select(AIGenerationRun)
|
|
.where(AIGenerationRun.basis_item_id == basis_item.id)
|
|
.order_by(AIGenerationRun.id.desc())
|
|
.limit(50)
|
|
)
|
|
variant_result = await db.execute(
|
|
select(Item)
|
|
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
|
|
.order_by(Item.created_at.desc(), Item.id.desc())
|
|
.limit(300)
|
|
)
|
|
runs = list(run_result.scalars().all())
|
|
variants = list(variant_result.scalars().all())
|
|
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
|
|
status_message = (
|
|
f"Run #{run_id} failed to produce savable variants. "
|
|
f"Requested={generation_count}, Generated={len(generated)}, Saved={saved}. "
|
|
"Check model output/credentials and server logs."
|
|
if saved == 0
|
|
else f"Run #{run_id} finished. Requested={generation_count}, Generated={len(generated)}, Saved={saved}."
|
|
)
|
|
body = _basis_item_workspace_body(
|
|
basis_item,
|
|
runs,
|
|
variants,
|
|
usage_metrics,
|
|
family_stats,
|
|
filters,
|
|
error=status_message if saved == 0 else None,
|
|
success=status_message if saved > 0 else None,
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
generation_count=str(generation_count),
|
|
include_note_for_admin=note_for_admin,
|
|
include_note_in_prompt=note_in_prompt,
|
|
)
|
|
return _render_admin_page(
|
|
f"Basis Item #{basis_item.id}",
|
|
f"Basis Item Workspace #{basis_item.id}",
|
|
body,
|
|
)
|
|
|
|
|
|
@router.post("/basis-items/{basis_item_id}/review-bulk", include_in_schema=False)
|
|
async def basis_item_review_bulk(
|
|
basis_item_id: int,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
item_ids: list[int] = Form([]),
|
|
action: str = Form(...),
|
|
):
|
|
filters = {"status": "", "level": "", "run_id": "", "min_frequency": ""}
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
basis_item = await db.get(Item, basis_item_id)
|
|
if basis_item is None:
|
|
return RedirectResponse(url="/admin/basis-items", status_code=HTTP_303_SEE_OTHER)
|
|
|
|
valid_actions = {"approved", "rejected", "archived", "stale", "active"}
|
|
if action in valid_actions and item_ids:
|
|
result = await db.execute(
|
|
select(Item).where(
|
|
Item.id.in_(item_ids),
|
|
Item.generated_by == "ai",
|
|
Item.basis_item_id == basis_item.id,
|
|
)
|
|
)
|
|
items = list(result.scalars().all())
|
|
reviewed_at = datetime.now(timezone.utc)
|
|
for item in items:
|
|
item.variant_status = action
|
|
item.reviewed_by = admin.username
|
|
item.reviewed_at = reviewed_at
|
|
await db.commit()
|
|
|
|
run_result = await db.execute(
|
|
select(AIGenerationRun)
|
|
.where(AIGenerationRun.basis_item_id == basis_item.id)
|
|
.order_by(AIGenerationRun.id.desc())
|
|
.limit(50)
|
|
)
|
|
variant_result = await db.execute(
|
|
select(Item)
|
|
.where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id)
|
|
.order_by(Item.created_at.desc(), Item.id.desc())
|
|
.limit(300)
|
|
)
|
|
runs = list(run_result.scalars().all())
|
|
variants = list(variant_result.scalars().all())
|
|
usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants)
|
|
body = _basis_item_workspace_body(
|
|
basis_item,
|
|
runs,
|
|
variants,
|
|
usage_metrics,
|
|
family_stats,
|
|
filters,
|
|
success=f"Applied status '{action}' to selected variants.",
|
|
)
|
|
return _render_admin_page(
|
|
f"Basis Item #{basis_item.id}",
|
|
f"Basis Item Workspace #{basis_item.id}",
|
|
body,
|
|
)
|
|
|
|
|
|
def _ai_form_body(
|
|
key_configured: bool,
|
|
stats: dict[str, Any],
|
|
error: str | None = None,
|
|
success: str | None = None,
|
|
generation_summary: dict[str, Any] | None = None,
|
|
basis_items: list[Item] | None = None,
|
|
generation_runs: list[AIGenerationRun] | None = None,
|
|
generated_variants: list[Item] | None = None,
|
|
basis_item_id: str = "",
|
|
target_level: str = "mudah",
|
|
ai_model: str = settings.OPENROUTER_MODEL_QWEN,
|
|
generation_count: str = "1",
|
|
operator_notes: str = "",
|
|
include_note_for_admin: bool = True,
|
|
include_note_in_prompt: bool = False,
|
|
) -> str:
|
|
error_html = f'<div class="error">{escape(error)}</div>' if error else ""
|
|
success_html = f'<div class="success">{escape(success)}</div>' if success else ""
|
|
options_html = "".join(
|
|
f'<option value="{escape(model)}" {"selected" if model == ai_model else ""}>{escape(label)}</option>'
|
|
for model, label in SUPPORTED_MODELS.items()
|
|
)
|
|
basis_items = basis_items or []
|
|
generation_runs = generation_runs or []
|
|
generated_variants = generated_variants or []
|
|
basis_rows = [
|
|
[
|
|
item.id,
|
|
item.tryout_id,
|
|
item.slot,
|
|
item.website_id,
|
|
_truncate(item.stem, 90),
|
|
]
|
|
for item in basis_items
|
|
]
|
|
basis_table = _table(
|
|
["Item ID", "Tryout", "Slot", "Website", "Stem"],
|
|
basis_rows,
|
|
)
|
|
seed_callout = ""
|
|
if not basis_items:
|
|
seed_callout = """
|
|
<div class="success">
|
|
No <code>sedang</code> basis items found yet. Seed one demo website, tryout, and basis item to test AI generation immediately.
|
|
</div>
|
|
<form method="post" action="/admin/ai-playground/seed-demo">
|
|
<button type="submit">Seed Demo Basis Item</button>
|
|
</form>
|
|
"""
|
|
summary_html = ""
|
|
if generation_summary:
|
|
saved_item_ids = generation_summary.get("saved_item_ids") or []
|
|
summary_html = f"""
|
|
<h3 style="margin-top:24px">Latest Generation Run</h3>
|
|
<div class="grid">
|
|
<div class="stat">Run ID<strong>{generation_summary.get("run_id", "-")}</strong></div>
|
|
<div class="stat">Requested<strong>{generation_summary.get("requested_count", 0)}</strong></div>
|
|
<div class="stat">Generated<strong>{generation_summary.get("generated_count", 0)}</strong></div>
|
|
<div class="stat">Saved<strong>{len(saved_item_ids)}</strong></div>
|
|
</div>
|
|
<p class="muted">Each saved output starts as <code>draft</code>. Review per item below to approve, reject, archive, stale, or activate.</p>
|
|
"""
|
|
|
|
run_rows = [
|
|
[
|
|
run.id,
|
|
run.basis_item_id,
|
|
run.target_level,
|
|
run.requested_count,
|
|
run.model,
|
|
run.created_by,
|
|
str(run.created_at),
|
|
]
|
|
for run in generation_runs
|
|
]
|
|
runs_table = _table(
|
|
["Run ID", "Basis Item", "Target", "Requested", "Model", "Created By", "Created At"],
|
|
run_rows,
|
|
)
|
|
|
|
variant_rows = []
|
|
for item in generated_variants:
|
|
variant_rows.append(
|
|
"<tr>"
|
|
f"<td><input type=\"checkbox\" name=\"item_ids\" value=\"{item.id}\"></td>"
|
|
f"<td>{item.id}</td>"
|
|
f"<td>{item.generation_run_id or '-'}</td>"
|
|
f"<td>{item.basis_item_id or '-'}</td>"
|
|
f"<td>{escape(item.level)}</td>"
|
|
f"<td>{escape(item.variant_status)}</td>"
|
|
f"<td>{escape(item.ai_model or '-')}</td>"
|
|
f"<td>{escape(_truncate(item.stem, 100))}</td>"
|
|
f"<td>{escape(str(item.created_at))}</td>"
|
|
"</tr>"
|
|
)
|
|
variants_table = (
|
|
"<form method=\"post\" action=\"/admin/ai-playground/review-bulk\">"
|
|
"<div class=\"actions\" style=\"margin:16px 0\">"
|
|
"<select name=\"action\" style=\"max-width:260px\">"
|
|
"<option value=\"approved\">Approve selected</option>"
|
|
"<option value=\"rejected\">Reject selected</option>"
|
|
"<option value=\"archived\">Archive selected</option>"
|
|
"<option value=\"stale\">Mark stale</option>"
|
|
"<option value=\"active\">Activate selected</option>"
|
|
"</select>"
|
|
"<button type=\"submit\">Apply</button>"
|
|
"</div>"
|
|
"<table><thead><tr><th><input type=\"checkbox\" onclick=\"document.querySelectorAll('input[name="item_ids"]').forEach(el => el.checked = this.checked)\"></th><th>Item ID</th><th>Run ID</th><th>Basis</th><th>Level</th><th>Status</th><th>Model</th><th>Stem</th><th>Created At</th></tr></thead><tbody>"
|
|
+ ("".join(variant_rows) if variant_rows else "<tr><td colspan=\"9\">No AI-generated variants yet.</td></tr>")
|
|
+ "</tbody></table></form>"
|
|
)
|
|
|
|
return f"""
|
|
<p class="muted">OpenRouter key configured: <strong>{"Yes" if key_configured else "No"}</strong></p>
|
|
<p class="muted">Total AI-generated items: <strong>{stats.get("total_ai_items", 0)}</strong></p>
|
|
<p class="muted">Hybrid workflow: one run can generate one or many variants; each item remains independently reviewable.</p>
|
|
{success_html}
|
|
{error_html}
|
|
{seed_callout}
|
|
<form method="post" action="/admin/ai-playground" autocomplete="off">
|
|
<label for="basis_item_id">Basis Item ID</label>
|
|
<input id="basis_item_id" name="basis_item_id" type="number" value="{escape(basis_item_id)}">
|
|
<label for="target_level">Target Level</label>
|
|
<select id="target_level" name="target_level" style="width:100%;box-sizing:border-box;border:1px solid #cbd5e1;border-radius:10px;padding:12px 14px;font-size:15px;">
|
|
<option value="mudah" {"selected" if target_level == "mudah" else ""}>mudah</option>
|
|
<option value="sulit" {"selected" if target_level == "sulit" else ""}>sulit</option>
|
|
</select>
|
|
<label for="ai_model">Model</label>
|
|
<select id="ai_model" name="ai_model" style="width:100%;box-sizing:border-box;border:1px solid #cbd5e1;border-radius:10px;padding:12px 14px;font-size:15px;">
|
|
{options_html}
|
|
</select>
|
|
<label for="generation_count">Generate Count</label>
|
|
<input id="generation_count" name="generation_count" type="number" min="1" max="50" value="{escape(generation_count)}">
|
|
<p class="muted">Recommended: 1-3 variants per run. Larger runs can increase overlap and review burden. Backend safety cap: 50.</p>
|
|
<label for="operator_notes">Optional Notes (style hints)</label>
|
|
<textarea id="operator_notes" name="operator_notes" rows="3" placeholder="Optional generation note for this run">{escape(operator_notes)}</textarea>
|
|
<label class="row"><input type="checkbox" name="include_note_for_admin" {"checked" if include_note_for_admin else ""}> Save note for admin team (visible in run history)</label>
|
|
<label class="row"><input type="checkbox" name="include_note_in_prompt" {"checked" if include_note_in_prompt else ""}> Include note in AI prompt payload</label>
|
|
<p class="muted" style="margin-top:6px;">Example note: <code>Use simple wording, one-step arithmetic, avoid trick options.</code></p>
|
|
<button type="submit">Generate Run</button>
|
|
</form>
|
|
<h3 style="margin-top:24px">Available Sedang Basis Items</h3>
|
|
<p class="muted">The generator needs a <code>sedang</code> item. Use one of these IDs, or seed demo data if the table is empty.</p>
|
|
{basis_table}
|
|
{summary_html}
|
|
<h3 style="margin-top:24px">Recent Generation Runs</h3>
|
|
{runs_table}
|
|
<h3 style="margin-top:24px">Generated Variants (Review Queue)</h3>
|
|
<p class="muted">Use bulk review actions to move items from draft to approved/active, or reject/archive/stale.</p>
|
|
{variants_table}
|
|
"""
|
|
|
|
|
|
@router.get("/ai-playground", include_in_schema=False)
|
|
async def ai_playground_view(request: Request, db: AsyncSession = Depends(get_db)):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
stats = await get_ai_stats(db)
|
|
basis_items = await _basis_items_for_playground(db)
|
|
basis_item_id = request.query_params.get("basis_item_id", "")
|
|
generation_runs = await _recent_generation_runs(db)
|
|
selected_basis_item_id: int | None = None
|
|
if basis_item_id and str(basis_item_id).isdigit():
|
|
selected_basis_item_id = int(str(basis_item_id))
|
|
generated_variants = await _recent_generated_variants(
|
|
db,
|
|
basis_item_id=selected_basis_item_id,
|
|
)
|
|
body = _ai_form_body(
|
|
bool(settings.OPENROUTER_API_KEY),
|
|
stats,
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
basis_item_id=str(basis_item_id or ""),
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
|
|
@router.post("/ai-playground/seed-demo", include_in_schema=False)
|
|
async def ai_playground_seed_demo(request: Request, db: AsyncSession = Depends(get_db)):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
demo_item = await _find_or_create_demo_basis_item(db)
|
|
stats = await get_ai_stats(db)
|
|
basis_items = await _basis_items_for_playground(db)
|
|
generation_runs = await _recent_generation_runs(db)
|
|
generated_variants = await _recent_generated_variants(db)
|
|
body = _ai_form_body(
|
|
bool(settings.OPENROUTER_API_KEY),
|
|
stats,
|
|
success=f"Demo basis item is ready: item #{demo_item.id}, tryout {demo_item.tryout_id}, slot {demo_item.slot}.",
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
basis_item_id=str(demo_item.id),
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
|
|
@router.post("/ai-playground", include_in_schema=False)
|
|
async def ai_playground_submit(
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
basis_item_id: int = Form(...),
|
|
target_level: str = Form(...),
|
|
ai_model: str = Form(...),
|
|
generation_count: int = Form(1),
|
|
operator_notes: str = Form(""),
|
|
include_note_for_admin: str | None = Form(None),
|
|
include_note_in_prompt: str | None = Form(None),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
stats = await get_ai_stats(db)
|
|
basis_items = await _basis_items_for_playground(db)
|
|
generation_runs = await _recent_generation_runs(db)
|
|
generated_variants = await _recent_generated_variants(db)
|
|
|
|
note_for_admin = include_note_for_admin == "on"
|
|
note_in_prompt = include_note_in_prompt == "on"
|
|
|
|
if not settings.OPENROUTER_API_KEY:
|
|
body = _ai_form_body(
|
|
False,
|
|
stats,
|
|
error="OPENROUTER_API_KEY is not configured in the environment.",
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
basis_item_id=str(basis_item_id),
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
generation_count=str(generation_count),
|
|
operator_notes=operator_notes,
|
|
include_note_for_admin=note_for_admin,
|
|
include_note_in_prompt=note_in_prompt,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
if target_level not in {"mudah", "sulit"}:
|
|
body = _ai_form_body(
|
|
True,
|
|
stats,
|
|
error="Target level must be mudah or sulit.",
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
basis_item_id=str(basis_item_id),
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
generation_count=str(generation_count),
|
|
operator_notes=operator_notes,
|
|
include_note_for_admin=note_for_admin,
|
|
include_note_in_prompt=note_in_prompt,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
if not validate_ai_model(ai_model):
|
|
body = _ai_form_body(
|
|
True,
|
|
stats,
|
|
error="Unsupported AI model.",
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
basis_item_id=str(basis_item_id),
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
generation_count=str(generation_count),
|
|
operator_notes=operator_notes,
|
|
include_note_for_admin=note_for_admin,
|
|
include_note_in_prompt=note_in_prompt,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
result = await db.execute(select(Item).where(Item.id == basis_item_id))
|
|
basis_item = result.scalar_one_or_none()
|
|
if not basis_item:
|
|
body = _ai_form_body(
|
|
True,
|
|
stats,
|
|
error=f"Basis item not found: {basis_item_id}",
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
basis_item_id=str(basis_item_id),
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
generation_count=str(generation_count),
|
|
operator_notes=operator_notes,
|
|
include_note_for_admin=note_for_admin,
|
|
include_note_in_prompt=note_in_prompt,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
if basis_item.level != "sedang":
|
|
body = _ai_form_body(
|
|
True,
|
|
stats,
|
|
error=f"Basis item must be sedang level, got: {basis_item.level}",
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
basis_item_id=str(basis_item_id),
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
generation_count=str(generation_count),
|
|
operator_notes=operator_notes,
|
|
include_note_for_admin=note_for_admin,
|
|
include_note_in_prompt=note_in_prompt,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
if generation_count < 1 or generation_count > 50:
|
|
body = _ai_form_body(
|
|
True,
|
|
stats,
|
|
error="Generate count must be between 1 and 50.",
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
basis_item_id=str(basis_item_id),
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
generation_count=str(generation_count),
|
|
operator_notes=operator_notes,
|
|
include_note_for_admin=note_for_admin,
|
|
include_note_in_prompt=note_in_prompt,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
run_id = await create_generation_run(
|
|
basis_item_id=basis_item.id,
|
|
source_snapshot_question_id=basis_item.source_snapshot_question_id,
|
|
target_level=target_level,
|
|
requested_count=generation_count,
|
|
model=ai_model,
|
|
created_by=admin.username,
|
|
operator_notes=(operator_notes.strip() or None) if note_for_admin else None,
|
|
db=db,
|
|
)
|
|
|
|
generated = await generate_questions_batch(
|
|
basis_item=basis_item,
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
count=generation_count,
|
|
operator_notes=operator_notes if note_in_prompt else None,
|
|
)
|
|
|
|
saved_item_ids: list[int] = []
|
|
from app.schemas.ai import GeneratedQuestion
|
|
|
|
for generated_question in generated:
|
|
item_id = await save_ai_question(
|
|
generated_data=GeneratedQuestion(
|
|
stem=generated_question.stem,
|
|
options=generated_question.options,
|
|
correct=generated_question.correct,
|
|
explanation=generated_question.explanation or None,
|
|
),
|
|
tryout_id=basis_item.tryout_id,
|
|
website_id=basis_item.website_id,
|
|
basis_item_id=basis_item.id,
|
|
slot=basis_item.slot,
|
|
level=target_level,
|
|
ai_model=ai_model,
|
|
generation_run_id=run_id,
|
|
source_snapshot_question_id=basis_item.source_snapshot_question_id,
|
|
variant_status="draft",
|
|
db=db,
|
|
)
|
|
if item_id:
|
|
saved_item_ids.append(item_id)
|
|
|
|
await db.commit()
|
|
updated_stats = await get_ai_stats(db)
|
|
updated_basis_items = await _basis_items_for_playground(db)
|
|
updated_runs = await _recent_generation_runs(db)
|
|
updated_variants = await _recent_generated_variants(db)
|
|
|
|
if not saved_item_ids:
|
|
body = _ai_form_body(
|
|
True,
|
|
updated_stats,
|
|
error="Generation run completed but no items were saved. Check model output and logs.",
|
|
basis_items=updated_basis_items,
|
|
generation_runs=updated_runs,
|
|
generated_variants=updated_variants,
|
|
basis_item_id=str(basis_item_id),
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
generation_count=str(generation_count),
|
|
operator_notes=operator_notes,
|
|
include_note_for_admin=note_for_admin,
|
|
include_note_in_prompt=note_in_prompt,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
body = _ai_form_body(
|
|
True,
|
|
updated_stats,
|
|
success=f"Generation run #{run_id} completed. Saved {len(saved_item_ids)} item(s).",
|
|
generation_summary={
|
|
"run_id": run_id,
|
|
"requested_count": generation_count,
|
|
"generated_count": len(generated),
|
|
"saved_item_ids": saved_item_ids,
|
|
},
|
|
basis_items=updated_basis_items,
|
|
generation_runs=updated_runs,
|
|
generated_variants=updated_variants,
|
|
basis_item_id=str(basis_item_id),
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
generation_count=str(generation_count),
|
|
operator_notes=operator_notes,
|
|
include_note_for_admin=note_for_admin,
|
|
include_note_in_prompt=note_in_prompt,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
|
|
@router.post("/ai-playground/save", include_in_schema=False)
|
|
async def ai_playground_save(
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
basis_item_id: int = Form(...),
|
|
tryout_id: str = Form(...),
|
|
website_id: int = Form(...),
|
|
slot: int = Form(...),
|
|
target_level: str = Form(...),
|
|
ai_model: str = Form(...),
|
|
stem: str = Form(...),
|
|
options_json: str = Form(...),
|
|
correct: str = Form(...),
|
|
explanation: str = Form(""),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
stats = await get_ai_stats(db)
|
|
basis_items = await _basis_items_for_playground(db)
|
|
|
|
if target_level not in {"mudah", "sulit"}:
|
|
body = _ai_form_body(
|
|
bool(settings.OPENROUTER_API_KEY),
|
|
stats,
|
|
error="Only mudah or sulit generated items can be saved from the playground.",
|
|
basis_items=basis_items,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
try:
|
|
options = json.loads(options_json)
|
|
except json.JSONDecodeError:
|
|
body = _ai_form_body(
|
|
bool(settings.OPENROUTER_API_KEY),
|
|
stats,
|
|
error="Generated options payload is invalid.",
|
|
basis_items=basis_items,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
from app.schemas.ai import GeneratedQuestion
|
|
|
|
item_id = await save_ai_question(
|
|
generated_data=GeneratedQuestion(
|
|
stem=stem,
|
|
options=options,
|
|
correct=correct,
|
|
explanation=explanation or None,
|
|
),
|
|
tryout_id=tryout_id,
|
|
website_id=website_id,
|
|
basis_item_id=basis_item_id,
|
|
slot=slot,
|
|
level=target_level,
|
|
ai_model=ai_model,
|
|
variant_status="draft",
|
|
db=db,
|
|
)
|
|
if not item_id:
|
|
body = _ai_form_body(
|
|
bool(settings.OPENROUTER_API_KEY),
|
|
stats,
|
|
error="Failed to save generated item. Check server logs for the database error.",
|
|
basis_items=basis_items,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
await db.commit()
|
|
updated_stats = await get_ai_stats(db)
|
|
updated_basis_items = await _basis_items_for_playground(db)
|
|
body = _ai_form_body(
|
|
bool(settings.OPENROUTER_API_KEY),
|
|
updated_stats,
|
|
success=f"Generated item saved successfully as item #{item_id}.",
|
|
basis_items=updated_basis_items,
|
|
basis_item_id=str(basis_item_id),
|
|
target_level=target_level,
|
|
ai_model=ai_model,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
|
|
@router.post("/ai-playground/review-bulk", include_in_schema=False)
|
|
async def ai_playground_review_bulk(
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
item_ids: list[int] = Form([]),
|
|
action: str = Form(...),
|
|
):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
|
|
valid_actions = {"approved", "rejected", "archived", "stale", "active"}
|
|
stats = await get_ai_stats(db)
|
|
basis_items = await _basis_items_for_playground(db)
|
|
generation_runs = await _recent_generation_runs(db)
|
|
generated_variants = await _recent_generated_variants(db)
|
|
|
|
if action not in valid_actions:
|
|
body = _ai_form_body(
|
|
bool(settings.OPENROUTER_API_KEY),
|
|
stats,
|
|
error="Invalid review action.",
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
if not item_ids:
|
|
body = _ai_form_body(
|
|
bool(settings.OPENROUTER_API_KEY),
|
|
stats,
|
|
error="Select at least one generated item.",
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
result = await db.execute(
|
|
select(Item).where(Item.id.in_(item_ids), Item.generated_by == "ai")
|
|
)
|
|
items = list(result.scalars().all())
|
|
if not items:
|
|
body = _ai_form_body(
|
|
bool(settings.OPENROUTER_API_KEY),
|
|
stats,
|
|
error="No matching AI-generated items were found for the selected IDs.",
|
|
basis_items=basis_items,
|
|
generation_runs=generation_runs,
|
|
generated_variants=generated_variants,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
reviewed_at = datetime.now(timezone.utc)
|
|
for item in items:
|
|
item.variant_status = action
|
|
item.reviewed_by = admin.username
|
|
item.reviewed_at = reviewed_at
|
|
|
|
await db.commit()
|
|
|
|
updated_stats = await get_ai_stats(db)
|
|
updated_basis_items = await _basis_items_for_playground(db)
|
|
updated_runs = await _recent_generation_runs(db)
|
|
updated_variants = await _recent_generated_variants(db)
|
|
body = _ai_form_body(
|
|
bool(settings.OPENROUTER_API_KEY),
|
|
updated_stats,
|
|
success=f"Updated {len(items)} item(s) to status '{action}'.",
|
|
basis_items=updated_basis_items,
|
|
generation_runs=updated_runs,
|
|
generated_variants=updated_variants,
|
|
)
|
|
return _render_admin_page("AI Playground", "AI Playground", body)
|
|
|
|
|
|
@router.get("/tryout/list", include_in_schema=False)
|
|
@router.get("/item/list", include_in_schema=False)
|
|
@router.get("/user/list", include_in_schema=False)
|
|
@router.get("/session/list", include_in_schema=False)
|
|
@router.get("/tryoutstats/list", include_in_schema=False)
|
|
async def legacy_admin_paths(request: Request):
|
|
admin = await _current_admin(request)
|
|
if not admin:
|
|
return _login_redirect()
|
|
return _dashboard_redirect()
|