""" Plain FastAPI admin UI backed by SQLAlchemy and Redis sessions. This replaces the previous fastapi-admin runtime path, which depended on Tortoise-oriented internals that do not match this project. """ import secrets import uuid from dataclasses import dataclass from datetime import datetime, timezone from html import escape, unescape import json import re from typing import Any import aioredis from fastapi import APIRouter, Depends, File, Form, Request, UploadFile from sqlalchemy import func, select from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from starlette.responses import HTMLResponse, RedirectResponse from starlette.status import HTTP_303_SEE_OTHER, HTTP_401_UNAUTHORIZED, HTTP_429_TOO_MANY_REQUESTS from app.core.config import get_settings from app.database import get_db from app.models import ( AIGenerationRun, Item, Session, Tryout, TryoutImportSnapshot, TryoutSnapshotQuestion, UserAnswer, Website, ) from app.services.ai_generation import ( create_generation_run, generate_questions_batch, get_ai_stats, save_ai_question, ) from app.services.irt_calibration import get_calibration_status from app.services.tryout_json_import import ( TryoutImportError, import_tryout_json_snapshot, preview_tryout_json_import, ) settings = get_settings() router = APIRouter(prefix="/admin", tags=["admin-web"]) SESSION_COOKIE = "access_token" CSRF_COOKIE = "admin_csrf_token" SESSION_PREFIX = "admin:session:" IMPORT_PREVIEW_PREFIX = "admin:import-preview:" IMPORT_PREVIEW_TTL_SECONDS = 900 LOGIN_RATE_LIMIT_PREFIX = "admin:login:attempts:" LOGIN_RATE_LIMIT_MAX_ATTEMPTS = 10 LOGIN_RATE_LIMIT_WINDOW_SECONDS = 300 _admin_redis = None @dataclass class AdminPrincipal: username: str async def configure_admin_web() -> None: global _admin_redis if _admin_redis is not None: return if not settings.ADMIN_USERNAME or not settings.ADMIN_PASSWORD: raise RuntimeError("ENABLE_ADMIN=true requires ADMIN_USERNAME and ADMIN_PASSWORD to be set.") _admin_redis = aioredis.from_url( settings.REDIS_URL, encoding="utf-8", decode_responses=True, ) async def shutdown_admin_web() -> None: global _admin_redis if _admin_redis is None: return try: await _admin_redis.close() finally: _admin_redis = None async def _current_admin(request: Request) -> AdminPrincipal | None: if _admin_redis is None: return None token = request.cookies.get(SESSION_COOKIE) if not token: return None username = await _admin_redis.get(f"{SESSION_PREFIX}{token}") if not username: return None return AdminPrincipal(username=str(username)) def _login_redirect() -> RedirectResponse: return RedirectResponse(url="/admin/login", status_code=HTTP_303_SEE_OTHER) def _dashboard_redirect() -> RedirectResponse: return RedirectResponse(url="/admin/dashboard", status_code=HTTP_303_SEE_OTHER) def _render_auth_page( request: Request, title: str, subtitle: str, body: str, status_code: int = 200, ) -> HTMLResponse: remember_me_checked = "checked" if request.cookies.get("remember_me") == "on" else "" html = f""" {escape(title)}

{escape(title)}

{escape(subtitle)}

{body.replace("__REMEMBER_ME_CHECKED__", remember_me_checked)}
""" csrf_token = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) csrf_input = f'' html = re.sub( r'(]*method="post"[^>]*>)', r"\1" + csrf_input, html, flags=re.IGNORECASE, ) response = HTMLResponse(html, status_code=status_code) response.set_cookie( CSRF_COOKIE, csrf_token, path="/admin", httponly=False, secure=settings.ENVIRONMENT == "production", samesite="lax", ) return response def _render_admin_page(request: Request, title: str, page_title: str, body: str) -> HTMLResponse: html = f""" {escape(title)}

{escape(page_title)}

{body}
""" csrf_token = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) csrf_input = f'' html = re.sub( r'(]*method="post"[^>]*>)', r"\1" + csrf_input, html, flags=re.IGNORECASE, ) response = HTMLResponse(html) response.set_cookie( CSRF_COOKIE, csrf_token, path="/admin", httponly=False, secure=settings.ENVIRONMENT == "production", samesite="lax", ) return response def _verify_csrf(request: Request, csrf_token: str | None) -> None: cookie_token = request.cookies.get(CSRF_COOKIE) if not cookie_token or not csrf_token: raise HTTPException(status_code=403, detail="CSRF validation failed") if not secrets.compare_digest(cookie_token, csrf_token): raise HTTPException(status_code=403, detail="CSRF validation failed") async def _enforce_csrf(request: Request) -> None: form = await request.form() _verify_csrf(request, form.get("csrf_token")) async def _csrf_route_guard(request: Request) -> None: if request.method.upper() != "POST": return await _enforce_csrf(request) router.dependencies.append(Depends(_csrf_route_guard)) def _table(headers: list[str], rows: list[list[Any]]) -> str: head = "".join(f"{escape(str(header))}" for header in headers) body_rows = [] for row in rows: cols = "".join(f"{escape(str(value))}" for value in row) body_rows.append(f"{cols}") body = "".join(body_rows) or f"No data" return f"{head}{body}
" def _truncate(text: str | None, max_length: int = 120) -> str: if not text: return "" if len(text) <= max_length: return text return f"{text[: max_length - 3]}..." def _html_to_text(value: str | None) -> str: if not value: return "" text = re.sub(r"<[^>]+>", " ", value) text = unescape(text) text = re.sub(r"\s+", " ", text).strip() return text def _websites_form_body( websites: list[Website], error: str | None = None, success: str | None = None, site_name: str = "", site_url: str = "", ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" body_rows = [] for website in websites: actions = f"""
Edit
""" body_rows.append( "" f"{website.id}" f"{escape(website.site_name)}" f"{escape(website.site_url)}" f"{actions}" "" ) if body_rows: websites_table = ( "" + "".join(body_rows) + "
IDNameURLActions
" ) else: websites_table = _table(["ID", "Name", "URL", "Actions"], []) return f"""

Register websites here so imports and tryout references can be tied to a known source site.

{success_html} {error_html}

Registered Websites

Use the website ID when importing read-only tryout snapshots.

{websites_table} """ def _website_edit_form_body( website: Website, error: str | None = None, success: str | None = None, site_name: str | None = None, site_url: str | None = None, ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" display_name = website.site_name if site_name is None else site_name display_url = website.site_url if site_url is None else site_url return f"""

Website ID: {website.id}

{success_html} {error_html}
Back
""" def _tryout_import_form_body( websites: list[Website], recent_snapshots: list[TryoutImportSnapshot], error: str | None = None, success: str | None = None, selected_website_id: int | None = None, preview: dict[str, Any] | None = None, preview_token: str | None = None, upload_filename: str = "", ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" website_options = [''] for website in websites: selected = "selected" if selected_website_id == website.id else "" website_options.append( f'' ) website_map = {website.id: website.site_name for website in websites} snapshot_rows = [] for snapshot in recent_snapshots: snapshot_rows.append( "" f"{snapshot.id}" f"{escape(website_map.get(snapshot.website_id, 'Unknown'))} (#{snapshot.website_id})" f"{escape(snapshot.source_tryout_id)}" f"{escape(snapshot.title)}" f"{snapshot.question_count}" f"{escape(str(snapshot.created_at))}" f"Browse" "" ) snapshots_table = ( "" + ("".join(snapshot_rows) if snapshot_rows else "") + "
Snapshot IDWebsiteTryout IDTitleQuestionsImported AtActions
No data
" ) preview_html = "" if preview: totals = preview.get("totals") or {} tryout_rows = [] for tryout in preview.get("tryouts") or []: diff = tryout.get("question_diff") or {} warnings = "; ".join(tryout.get("warnings") or []) or "-" tryout_rows.append( [ tryout.get("source_tryout_id"), tryout.get("title"), diff.get("total_questions", 0), diff.get("new_questions", 0), diff.get("updated_questions", 0), diff.get("unchanged_questions", 0), diff.get("removed_questions", 0), warnings, ] ) import_form = "" if preview_token and selected_website_id: import_form = f"""
""" preview_html = f"""

Preview Summary

File: {escape(upload_filename or "uploaded JSON")}

Tryouts{preview.get("tryout_count", 0)}
New Questions{totals.get("new_questions", 0)}
Updated Questions{totals.get("updated_questions", 0)}
Removed Questions{totals.get("removed_questions", 0)}
{_table( ["Tryout ID", "Title", "Total", "New", "Updated", "Unchanged", "Removed", "Warnings"], tryout_rows, )}
{import_form}
""" return f"""

Import Sejoli tryout JSON as read-only snapshot reference data. This does not create live item-bank questions.

Use this when the source tryout changes upstream. Re-import updates matching source question IDs, inserts new ones, and marks missing ones inactive.

{success_html} {error_html}
{preview_html}

Recent Snapshots

These are archived imports stored in PostgreSQL for traceability.

{snapshots_table} """ def _snapshot_slot_map(snapshot: TryoutImportSnapshot) -> dict[str, int]: slot_map: dict[str, int] = {} questions = (snapshot.raw_payload or {}).get("questions") or [] for index, question in enumerate(questions, start=1): source_question_id = str((question or {}).get("id") or "").strip() if source_question_id: slot_map[source_question_id] = index return slot_map def _snapshot_options_to_item_options(raw_options: list[dict[str, Any]] | list[Any]) -> dict[str, str]: item_options: dict[str, str] = {} for option in raw_options or []: if not isinstance(option, dict): continue increment = str(option.get("increment") or "").strip().upper() text = str(option.get("text") or option.get("label") or "").strip() if increment and text: item_options[increment] = text return item_options def _snapshot_questions_body( snapshot: TryoutImportSnapshot, questions: list[TryoutSnapshotQuestion], promoted_items_by_slot: dict[int, Item], error: str | None = None, success: str | None = None, ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" slot_map = _snapshot_slot_map(snapshot) rows = [] for question in questions: slot = slot_map.get(question.source_question_id, 0) promoted_item = promoted_items_by_slot.get(slot) if promoted_item: select_html = "" action_html = ( f'Item #{promoted_item.id} already exists. ' f'Open in AI Playground' ) else: select_html = f'' action_html = "Ready to promote" rows.append( "" f"{select_html}" f"{slot or '-'}" f"{escape(question.source_question_id)}" f"{escape(question.correct_answer)}" f"{question.option_count}" f"{'Yes' if question.is_active else 'No'}" f"{escape(_truncate(question.question_title or question.question_html, 100))}" f"{action_html}" "" ) questions_table = ( "
" f"" "
" "" "
" "" + ("".join(rows) if rows else "") + "
el.checked = this.checked)\">SlotSource Question IDCorrectOptionsActiveStemAction
No data
" "
" ) return f"""

Snapshot ID: {snapshot.id} | Website: {snapshot.website_id} | Tryout: {escape(snapshot.source_tryout_id)}

Promote selected snapshot questions into the live items table as sedang basis items for AI generation.

{success_html} {error_html} {questions_table}

Back to Tryout Import

""" async def _basis_items_for_playground(db: AsyncSession, limit: int = 20) -> list[Item]: result = await db.execute( select(Item) .where(Item.level == "sedang") .order_by(Item.created_at.desc(), Item.id.desc()) .limit(limit) ) return list(result.scalars().all()) async def _recent_generation_runs(db: AsyncSession, limit: int = 20) -> list[AIGenerationRun]: result = await db.execute( select(AIGenerationRun).order_by(AIGenerationRun.id.desc()).limit(limit) ) return list(result.scalars().all()) async def _recent_generated_variants( db: AsyncSession, limit: int = 100, basis_item_id: int | None = None, ) -> list[Item]: stmt = select(Item).where(Item.generated_by == "ai") if basis_item_id is not None: stmt = stmt.where(Item.basis_item_id == basis_item_id) result = await db.execute( stmt.order_by(Item.created_at.desc(), Item.id.desc()).limit(limit) ) return list(result.scalars().all()) async def _usage_metrics_for_items( db: AsyncSession, item_ids: list[int], ) -> dict[int, dict[str, float]]: if not item_ids: return {} result = await db.execute( select( UserAnswer.item_id, func.count(UserAnswer.id).label("impressions"), func.count(func.distinct(UserAnswer.wp_user_id)).label("unique_users"), ) .where(UserAnswer.item_id.in_(item_ids)) .group_by(UserAnswer.item_id) ) metrics: dict[int, dict[str, float]] = {} for item_id, impressions, unique_users in result.all(): impressions_i = int(impressions or 0) unique_users_i = int(unique_users or 0) frequency = (impressions_i / unique_users_i) if unique_users_i else 0.0 metrics[int(item_id)] = { "impressions": float(impressions_i), "unique_users": float(unique_users_i), "frequency": float(frequency), } return metrics async def _family_usage_stats( db: AsyncSession, basis_item: Item, variants: list[Item], ) -> tuple[dict[int, dict[str, float]], dict[str, float]]: family_item_ids = [basis_item.id] + [item.id for item in variants] usage_metrics = await _usage_metrics_for_items(db, family_item_ids) family_impressions = int(sum(metric["impressions"] for metric in usage_metrics.values())) family_unique_users = int( await db.scalar( select(func.count(func.distinct(UserAnswer.wp_user_id))).where( UserAnswer.item_id.in_(family_item_ids) ) ) or 0 ) family_frequency = (family_impressions / family_unique_users) if family_unique_users else 0.0 return usage_metrics, { "impressions": float(family_impressions), "unique_users": float(family_unique_users), "frequency": float(family_frequency), } def _basis_items_list_body(items: list[Item]) -> str: rows = [] for item in items: rows.append( "" f"{item.id}" f"{escape(item.tryout_id)}" f"{item.slot}" f"{item.website_id}" f"{escape(_truncate(item.stem, 120))}" f"{item.source_snapshot_question_id or '-'}" f"Open Workspace" "" ) table = ( "" + ("".join(rows) if rows else "") + "
Item IDTryoutSlotWebsiteStemSource Snapshot QIDActions
No basis items found.
" ) return f"""

Basis items are canonical parent questions (sedang, non-AI). Open a workspace to generate and review AI child variants.

{table} """ def _basis_item_workspace_body( basis_item: Item, runs: list[AIGenerationRun], variants: list[Item], usage_by_item: dict[int, dict[str, float]], family_stats: dict[str, float], filters: dict[str, str], error: str | None = None, success: str | None = None, target_level: str = "mudah", ai_model: str = settings.OPENROUTER_MODEL_LLAMA, generation_count: str = "1", operator_notes: str = "", include_note_for_admin: bool = True, include_note_in_prompt: bool = False, ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" status_filter = filters.get("status", "") level_filter = filters.get("level", "") min_frequency_filter = filters.get("min_frequency", "") run_id_filter = filters.get("run_id", "") run_rows = [ [ run.id, run.target_level, run.requested_count, run.model, run.created_by, str(run.created_at), ] for run in runs ] runs_table = _table( ["Run ID", "Target", "Requested", "Model", "Created By", "Created At"], run_rows, ) variant_rows = [] for item in variants: usage = usage_by_item.get(item.id, {"impressions": 0.0, "unique_users": 0.0, "frequency": 0.0}) options = item.options if isinstance(item.options, dict) else {} options_rows = "".join( f"{escape(str(key))}" f"{escape(str(value))}" for key, value in options.items() ) or "No options" review_html = ( "
" "Review full content" f"
" f"

Full Stem
{escape(_html_to_text(item.stem))}

" "" "" f"{options_rows}" "
OptionText
" f"

Correct Answer: {escape(item.correct_answer or '-')}

" f"

Explanation: {escape(_html_to_text(item.explanation) or '-')}

" "
" "
" ) variant_rows.append( "" f"" f"{item.id}" f"{item.generation_run_id or '-'}" f"{escape(item.level)}" f"{escape(item.variant_status)}" f"{escape(item.ai_model or '-')}" f"{int(usage['impressions'])}" f"{int(usage['unique_users'])}" f"{usage['frequency']:.2f}" f"{escape(_truncate(_html_to_text(item.stem), 130))}{review_html}" f"{escape(str(item.created_at))}" "" ) variants_table = ( f"
" "
" "" "" "
" "" + ("".join(variant_rows) if variant_rows else "") + "
el.checked = this.checked)\">Item IDRun IDLevelStatusModelImpressionsUnique UsersFrequencyStemCreated At
No generated variants yet for this parent.
" ) return f""" {success_html} {error_html}

Parent Summary

Parent Item: #{basis_item.id} | Tryout: {escape(basis_item.tryout_id)} | Slot: {basis_item.slot} | Website: {basis_item.website_id} | Source Snapshot QID: {basis_item.source_snapshot_question_id or '-'}

Family Usage: impressions={int(family_stats.get("impressions", 0.0))}, unique users={int(family_stats.get("unique_users", 0.0))}, frequency={family_stats.get("frequency", 0.0):.2f}

Stem: {escape(_truncate(_html_to_text(basis_item.stem), 260))}

Generate Variants

Create new AI child variants for this parent.

Recommended: 1-3 per run. Larger runs increase overlap and review burden.

Example note: Use clinical language, avoid negatives, keep stem under 40 words.

Filter Variants

Filter child variants shown in the review table below.

Reset

Child Variants for This Parent

Filtered variants shown: {len(variants)}

{variants_table}

Generation Runs for This Parent

Run history is reference/audit data and is intentionally separated from variant review workflow.

{runs_table}

Back to Basis Items

""" async def _find_or_create_demo_basis_item(db: AsyncSession) -> Item: result = await db.execute( select(Item) .where( Item.level == "sedang", Item.generated_by == "manual", Item.tryout_id == "demo-tryout", ) .order_by(Item.id.asc()) .limit(1) ) existing_item = result.scalar_one_or_none() if existing_item: return existing_item website_result = await db.execute( select(Website).where(Website.site_url == "https://demo.local").limit(1) ) website = website_result.scalar_one_or_none() if website is None: website = Website(site_url="https://demo.local", site_name="Demo Website") db.add(website) await db.flush() tryout_result = await db.execute( select(Tryout) .where(Tryout.website_id == website.id, Tryout.tryout_id == "demo-tryout") .limit(1) ) tryout = tryout_result.scalar_one_or_none() if tryout is None: tryout = Tryout( website_id=website.id, tryout_id="demo-tryout", name="Demo AI Playground Tryout", description="Seed data for the AI playground.", scoring_mode="ctt", selection_mode="fixed", normalization_mode="static", ai_generation_enabled=True, ) db.add(tryout) await db.flush() item = Item( tryout_id=tryout.tryout_id, website_id=website.id, slot=1, level="sedang", stem="Sebuah toko memberi diskon 20% untuk sebuah tas. Jika harga setelah diskon adalah Rp240.000, berapakah harga tas sebelum diskon?", options={ "A": "Rp260.000", "B": "Rp300.000", "C": "Rp320.000", "D": "Rp360.000", }, correct_answer="B", explanation="Harga setelah diskon 20% berarti 80% dari harga awal. Jadi harga awal = 240.000 / 0,8 = 300.000.", generated_by="manual", calibrated=False, calibration_sample_size=0, ) db.add(item) await db.flush() await db.commit() await db.refresh(item) return item async def _load_websites(db: AsyncSession) -> list[Website]: result = await db.execute(select(Website).order_by(Website.id.asc())) return list(result.scalars().all()) async def _recent_snapshots(db: AsyncSession, limit: int = 20) -> list[TryoutImportSnapshot]: result = await db.execute( select(TryoutImportSnapshot).order_by(TryoutImportSnapshot.id.desc()).limit(limit) ) return list(result.scalars().all()) async def _ensure_operational_tryout(snapshot: TryoutImportSnapshot, db: AsyncSession) -> Tryout: result = await db.execute( select(Tryout).where( Tryout.website_id == snapshot.website_id, Tryout.tryout_id == snapshot.source_tryout_id, ) ) tryout = result.scalar_one_or_none() if tryout: return tryout tryout = Tryout( website_id=snapshot.website_id, tryout_id=snapshot.source_tryout_id, name=snapshot.title, description=f"Operational tryout basis created from imported snapshot #{snapshot.id}.", scoring_mode="ctt", selection_mode="fixed", normalization_mode="static", ai_generation_enabled=True, ) db.add(tryout) await db.flush() return tryout async def _load_snapshot_question_context( snapshot: TryoutImportSnapshot, db: AsyncSession, ) -> tuple[list[TryoutSnapshotQuestion], dict[int, Item], dict[str, int]]: question_result = await db.execute( select(TryoutSnapshotQuestion) .where( TryoutSnapshotQuestion.website_id == snapshot.website_id, TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, ) .order_by(TryoutSnapshotQuestion.source_question_id.asc()) ) questions = list(question_result.scalars().all()) item_result = await db.execute( select(Item).where( Item.website_id == snapshot.website_id, Item.tryout_id == snapshot.source_tryout_id, Item.level == "sedang", ) ) promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()} slot_map = _snapshot_slot_map(snapshot) questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id)) return questions, promoted_items_by_slot, slot_map async def _promote_snapshot_question_to_item( snapshot: TryoutImportSnapshot, question: TryoutSnapshotQuestion, db: AsyncSession, ) -> tuple[Item | None, str]: if ( question.website_id != snapshot.website_id or question.source_tryout_id != snapshot.source_tryout_id ): return None, "mismatch" slot_map = _snapshot_slot_map(snapshot) slot = slot_map.get(question.source_question_id) if not slot: max_slot = ( await db.scalar( select(func.max(Item.slot)).where( Item.website_id == snapshot.website_id, Item.tryout_id == snapshot.source_tryout_id, Item.level == "sedang", ) ) or 0 ) slot = max_slot + 1 options = _snapshot_options_to_item_options(question.raw_options) if not options: return None, "missing_options" await _ensure_operational_tryout(snapshot, db) existing_item_result = await db.execute( select(Item).where( Item.website_id == snapshot.website_id, Item.tryout_id == snapshot.source_tryout_id, Item.slot == slot, Item.level == "sedang", ) ) existing_item = existing_item_result.scalar_one_or_none() if existing_item is not None: return existing_item, "existing" item = Item( tryout_id=snapshot.source_tryout_id, website_id=snapshot.website_id, slot=slot, level="sedang", stem=question.question_html, options=options, correct_answer=question.correct_answer, explanation=question.explanation_html, generated_by="manual", source_snapshot_question_id=question.id, variant_status="active", calibrated=False, calibration_sample_size=0, ) db.add(item) await db.flush() return item, "created" @router.get("", include_in_schema=False) @router.get("/", include_in_schema=False) async def admin_root(request: Request): admin = await _current_admin(request) if admin: return _dashboard_redirect() return _login_redirect() @router.get("/login", include_in_schema=False) async def login_view(request: Request): admin = await _current_admin(request) if admin: return _dashboard_redirect() body = """

Direct environment-backed admin access.

""" return _render_auth_page( request, "Admin Login", "Use the configured admin credentials to access the dashboard.", body, ) @router.post("/login", include_in_schema=False) async def login_submit( request: Request, username: str = Form(...), password: str = Form(...), remember_me: str | None = Form(None), ): if _admin_redis is None: body = """
Admin backend is temporarily unavailable. Please try again.
""" return _render_auth_page( request, "Admin Login", "Use the configured admin credentials to access the dashboard.", body, status_code=503, ) client_ip = request.client.host if request.client else "unknown" rate_limit_key = f"{LOGIN_RATE_LIMIT_PREFIX}{client_ip}" attempts_raw = await _admin_redis.get(rate_limit_key) attempts = int(attempts_raw) if attempts_raw else 0 if attempts >= LOGIN_RATE_LIMIT_MAX_ATTEMPTS: body = """
Too many login attempts. Please wait a few minutes and try again.
""" return _render_auth_page( request, "Admin Login", "Use the configured admin credentials to access the dashboard.", body, status_code=HTTP_429_TOO_MANY_REQUESTS, ) if not ( secrets.compare_digest(username, settings.ADMIN_USERNAME) and secrets.compare_digest(password, settings.ADMIN_PASSWORD) ): attempts = await _admin_redis.incr(rate_limit_key) if attempts == 1: await _admin_redis.expire(rate_limit_key, LOGIN_RATE_LIMIT_WINDOW_SECONDS) body = f"""
Invalid username or password.
""" return _render_auth_page( request, "Admin Login", "Use the configured admin credentials to access the dashboard.", body, status_code=HTTP_401_UNAUTHORIZED, ) await _admin_redis.delete(rate_limit_key) expire = settings.ADMIN_SESSION_EXPIRE_SECONDS response = _dashboard_redirect() secure_cookie = settings.ENVIRONMENT == "production" if remember_me == "on": expire = max(expire, 3600 * 24 * 30) response.set_cookie( "remember_me", "on", expires=expire, path="/admin", secure=secure_cookie, samesite="lax", ) else: response.delete_cookie("remember_me", path="/admin") token = uuid.uuid4().hex response.set_cookie( SESSION_COOKIE, token, expires=expire, path="/admin", httponly=True, secure=secure_cookie, samesite="lax", ) await _admin_redis.set(f"{SESSION_PREFIX}{token}", settings.ADMIN_USERNAME, ex=expire) return response @router.get("/logout", include_in_schema=False) async def logout(request: Request): token = request.cookies.get(SESSION_COOKIE) if token and _admin_redis is not None: await _admin_redis.delete(f"{SESSION_PREFIX}{token}") response = _login_redirect() response.delete_cookie(SESSION_COOKIE, path="/admin") response.delete_cookie("remember_me", path="/admin") return response @router.get("/password", include_in_schema=False) async def password_view(request: Request): admin = await _current_admin(request) if not admin: return _login_redirect() body = f"""

Signed in as {escape(admin.username)}.

Password changes are disabled in the UI for this deployment.

Update ADMIN_PASSWORD in the server environment, then restart the app.

Session expiry is currently set to {settings.ADMIN_SESSION_EXPIRE_SECONDS} seconds.

Back to dashboard

""" return _render_auth_page( request, "Password Management", "Runtime password rotation is intentionally disabled.", body, ) @router.post("/password", include_in_schema=False) async def password_submit( request: Request, old_password: str = Form(...), new_password: str = Form(...), re_new_password: str = Form(...), ): _ = (old_password, new_password, re_new_password) admin = await _current_admin(request) if not admin: return _login_redirect() body = """
Password rotation via UI is disabled.

Update ADMIN_PASSWORD in the server environment, then restart the app.

Back to dashboard

""" return _render_auth_page( request, "Password Management", "Runtime password rotation is intentionally disabled.", body, status_code=400, ) @router.get("/dashboard", include_in_schema=False) async def dashboard_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() tryouts = await db.scalar(select(func.count()).select_from(Tryout)) or 0 items = await db.scalar(select(func.count()).select_from(Item)) or 0 sessions = await db.scalar(select(func.count()).select_from(Session)) or 0 completed_sessions = ( await db.scalar(select(func.count()).select_from(Session).where(Session.is_completed.is_(True))) or 0 ) body = f"""

Signed in as {escape(admin.username)}.

Tryouts{tryouts}
Items{items}
Sessions{sessions}
Completed Sessions{completed_sessions}

Open AI Playground

""" return _render_admin_page(request, "IRT Bank Soal Admin", "Dashboard", body) @router.get("/websites", include_in_schema=False) async def websites_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body(websites) return _render_admin_page(request, "Websites", "Websites", body) @router.post("/websites", include_in_schema=False) async def websites_submit( request: Request, db: AsyncSession = Depends(get_db), site_name: str = Form(...), site_url: str = Form(...), ): admin = await _current_admin(request) if not admin: return _login_redirect() normalized_name = site_name.strip() normalized_url = site_url.strip().rstrip("/") if not normalized_name: result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body( websites, error="Website name is required.", site_name=site_name, site_url=site_url, ) return _render_admin_page(request, "Websites", "Websites", body) if not normalized_url.startswith(("http://", "https://")): result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body( websites, error="Website URL must start with http:// or https://.", site_name=site_name, site_url=site_url, ) return _render_admin_page(request, "Websites", "Websites", body) website = Website(site_name=normalized_name, site_url=normalized_url) db.add(website) try: await db.commit() except IntegrityError: await db.rollback() result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body( websites, error="Website URL already exists.", site_name=site_name, site_url=site_url, ) return _render_admin_page(request, "Websites", "Websites", body) result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body( websites, success=f"Website added successfully with ID {website.id}.", ) return _render_admin_page(request, "Websites", "Websites", body) @router.get("/websites/{website_id}/edit", include_in_schema=False) async def website_edit_view( website_id: int, request: Request, db: AsyncSession = Depends(get_db), ): admin = await _current_admin(request) if not admin: return _login_redirect() website = await db.get(Website, website_id) if website is None: result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body(websites, error=f"Website not found: {website_id}") return _render_admin_page(request, "Websites", "Websites", body) body = _website_edit_form_body(website) return _render_admin_page(request, "Edit Website", "Edit Website", body) @router.post("/websites/{website_id}/edit", include_in_schema=False) async def website_edit_submit( website_id: int, request: Request, db: AsyncSession = Depends(get_db), site_name: str = Form(...), site_url: str = Form(...), ): admin = await _current_admin(request) if not admin: return _login_redirect() website = await db.get(Website, website_id) if website is None: result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body(websites, error=f"Website not found: {website_id}") return _render_admin_page(request, "Websites", "Websites", body) normalized_name = site_name.strip() normalized_url = site_url.strip().rstrip("/") if not normalized_name: body = _website_edit_form_body( website, error="Website name is required.", site_name=site_name, site_url=site_url, ) return _render_admin_page(request, "Edit Website", "Edit Website", body) if not normalized_url.startswith(("http://", "https://")): body = _website_edit_form_body( website, error="Website URL must start with http:// or https://.", site_name=site_name, site_url=site_url, ) return _render_admin_page(request, "Edit Website", "Edit Website", body) website.site_name = normalized_name website.site_url = normalized_url try: await db.commit() except IntegrityError: await db.rollback() body = _website_edit_form_body( website, error="Website URL already exists.", site_name=site_name, site_url=site_url, ) return _render_admin_page(request, "Edit Website", "Edit Website", body) await db.refresh(website) body = _website_edit_form_body( website, success=f"Website #{website.id} updated successfully.", ) return _render_admin_page(request, "Edit Website", "Edit Website", body) @router.post("/websites/{website_id}/delete", include_in_schema=False) async def website_delete_submit( website_id: int, request: Request, db: AsyncSession = Depends(get_db), ): admin = await _current_admin(request) if not admin: return _login_redirect() website = await db.get(Website, website_id) if website is None: result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body(websites, error=f"Website not found: {website_id}") return _render_admin_page(request, "Websites", "Websites", body) deleted_label = f"{website.site_name} ({website.site_url})" await db.delete(website) await db.commit() result = await db.execute(select(Website).order_by(Website.id.asc())) websites = list(result.scalars().all()) body = _websites_form_body( websites, success=f"Website deleted successfully: {deleted_label}", ) return _render_admin_page(request, "Websites", "Websites", body) @router.get("/tryout-import", include_in_schema=False) async def tryout_import_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() websites = await _load_websites(db) snapshots = await _recent_snapshots(db) body = _tryout_import_form_body(websites, snapshots) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) @router.post("/tryout-import/preview", include_in_schema=False) async def tryout_import_preview( request: Request, db: AsyncSession = Depends(get_db), website_id: int = Form(...), file: UploadFile = File(...), ): admin = await _current_admin(request) if not admin: return _login_redirect() websites = await _load_websites(db) snapshots = await _recent_snapshots(db) if not file.filename or not file.filename.lower().endswith(".json"): body = _tryout_import_form_body( websites, snapshots, error="File must be .json format.", selected_website_id=website_id, ) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) try: payload_bytes = await file.read() payload_text = payload_bytes.decode("utf-8") payload = json.loads(payload_text) except UnicodeDecodeError: body = _tryout_import_form_body( websites, snapshots, error="File must be UTF-8 encoded JSON.", selected_website_id=website_id, ) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) except json.JSONDecodeError as exc: body = _tryout_import_form_body( websites, snapshots, error=f"Invalid JSON file: {exc}", selected_website_id=website_id, ) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) try: preview = await preview_tryout_json_import(payload, website_id, db) except TryoutImportError as exc: body = _tryout_import_form_body( websites, snapshots, error=str(exc), selected_website_id=website_id, ) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) preview_token = uuid.uuid4().hex await _admin_redis.set( f"{IMPORT_PREVIEW_PREFIX}{preview_token}", payload_text, ex=IMPORT_PREVIEW_TTL_SECONDS, ) body = _tryout_import_form_body( websites, snapshots, selected_website_id=website_id, preview=preview, preview_token=preview_token, upload_filename=file.filename or "", ) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) @router.post("/tryout-import", include_in_schema=False) async def tryout_import_submit( request: Request, db: AsyncSession = Depends(get_db), website_id: int = Form(...), preview_token: str = Form(...), ): admin = await _current_admin(request) if not admin: return _login_redirect() websites = await _load_websites(db) snapshots = await _recent_snapshots(db) payload_text = await _admin_redis.get(f"{IMPORT_PREVIEW_PREFIX}{preview_token}") if not payload_text: body = _tryout_import_form_body( websites, snapshots, error="Preview token expired. Upload the JSON again and preview before importing.", selected_website_id=website_id, ) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) try: payload = json.loads(payload_text) result = await import_tryout_json_snapshot(payload, website_id, db) await db.commit() except TryoutImportError as exc: await db.rollback() body = _tryout_import_form_body( websites, snapshots, error=str(exc), selected_website_id=website_id, ) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) except Exception: await db.rollback() raise finally: await _admin_redis.delete(f"{IMPORT_PREVIEW_PREFIX}{preview_token}") updated_snapshots = await _recent_snapshots(db) imported_tryouts = result.get("imported_tryouts") or [] imported_count = sum((row.get("question_count") or 0) for row in imported_tryouts) body = _tryout_import_form_body( websites, updated_snapshots, success=( f"Imported {len(imported_tryouts)} tryout snapshot(s) and archived {imported_count} source question reference row(s)." ), selected_website_id=website_id, ) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) @router.get("/snapshot-questions", include_in_schema=False) async def snapshot_questions_view( request: Request, snapshot_id: int, db: AsyncSession = Depends(get_db), ): admin = await _current_admin(request) if not admin: return _login_redirect() snapshot = await db.get(TryoutImportSnapshot, snapshot_id) if snapshot is None: websites = await _load_websites(db) snapshots = await _recent_snapshots(db) body = _tryout_import_form_body( websites, snapshots, error=f"Snapshot not found: {snapshot_id}", ) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db) body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot) return _render_admin_page(request, "Snapshot Questions", "Snapshot Questions", body) @router.post("/snapshot-questions/promote-bulk", include_in_schema=False) async def snapshot_question_promote_bulk( request: Request, snapshot_id: int = Form(...), snapshot_question_ids: list[int] | None = Form(None), db: AsyncSession = Depends(get_db), ): admin = await _current_admin(request) if not admin: return _login_redirect() snapshot = await db.get(TryoutImportSnapshot, snapshot_id) if snapshot is None: websites = await _load_websites(db) snapshots = await _recent_snapshots(db) body = _tryout_import_form_body( websites, snapshots, error=f"Snapshot not found: {snapshot_id}", ) return _render_admin_page(request, "Tryout Import", "Tryout Import", body) if not snapshot_question_ids: questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db) body = _snapshot_questions_body( snapshot, questions, promoted_items_by_slot, error="Select at least one snapshot question to promote.", ) return _render_admin_page(request, "Snapshot Questions", "Snapshot Questions", body) question_result = await db.execute( select(TryoutSnapshotQuestion).where( TryoutSnapshotQuestion.id.in_(snapshot_question_ids) ) ) selected_questions = list(question_result.scalars().all()) created_items: list[Item] = [] existing_items: list[Item] = [] missing_option_count = 0 mismatch_count = 0 for question in selected_questions: item, status = await _promote_snapshot_question_to_item(snapshot, question, db) if status == "created" and item is not None: created_items.append(item) elif status == "existing" and item is not None: existing_items.append(item) elif status == "missing_options": missing_option_count += 1 elif status == "mismatch": mismatch_count += 1 await db.commit() questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db) success_parts = [] if created_items: success_parts.append(f"created {len(created_items)} item(s)") if existing_items: success_parts.append(f"reused {len(existing_items)} existing item(s)") if missing_option_count: success_parts.append(f"skipped {missing_option_count} question(s) with missing option text") if mismatch_count: success_parts.append(f"skipped {mismatch_count} mismatched question(s)") success_message = "Bulk promote finished: " + ", ".join(success_parts) + "." if created_items: success_message += f" Latest basis item ID: {created_items[-1].id}." body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot, success=success_message) return _render_admin_page(request, "Snapshot Questions", "Snapshot Questions", body) @router.get("/calibration-status", include_in_schema=False) async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() result = await db.execute(select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id)) tryouts = result.all() rows = [] for tryout_id, name, website_id in tryouts: status = await get_calibration_status(tryout_id, website_id, db) rows.append( [ tryout_id, name, status["total_items"], status["calibrated_items"], f'{status["calibration_percentage"]:.2f}%', "Yes" if status["ready_for_irt"] else "No", ] ) body = _table( ["Tryout ID", "Name", "Total Items", "Calibrated", "Calibration %", "Ready for IRT"], rows, ) return _render_admin_page(request, "Calibration Status", "Calibration Status", body) @router.get("/item-statistics", include_in_schema=False) async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() result = await db.execute(select(Item.level).distinct()) levels = result.scalars().all() rows = [] for level in levels: item_result = await db.execute(select(Item).where(Item.level == level).order_by(Item.slot).limit(10)) items = item_result.scalars().all() total_responses = sum(item.calibration_sample_size or 0 for item in items) calibrated_count = sum(1 for item in items if item.calibrated) calibration_percentage = (calibrated_count / len(items) * 100) if items else 0 avg_correctness = sum(item.ctt_p or 0 for item in items) / len(items) if items else 0 rows.append( [ level, len(items), calibrated_count, f"{calibration_percentage:.2f}%", total_responses, f"{avg_correctness:.4f}", ] ) body = _table( ["Level", "Total Items", "Calibrated", "Calibration %", "Responses", "Avg Correctness"], rows, ) return _render_admin_page(request, "Item Statistics", "Item Statistics", body) @router.get("/session-overview", include_in_schema=False) async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50)) sessions = result.scalars().all() rows = [ [ session.session_id, session.wp_user_id, session.tryout_id, "Yes" if session.is_completed else "No", session.scoring_mode_used, session.total_benar, session.NM, session.NN, session.theta, ] for session in sessions ] body = _table( ["Session ID", "WP User", "Tryout", "Completed", "Mode", "Benar", "NM", "NN", "Theta"], rows, ) return _render_admin_page(request, "Session Overview", "Session Overview", body) @router.get("/basis-items", include_in_schema=False) async def basis_items_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() result = await db.execute( select(Item) .where(Item.level == "sedang", Item.generated_by != "ai") .order_by(Item.updated_at.desc(), Item.id.desc()) .limit(200) ) basis_items = list(result.scalars().all()) body = _basis_items_list_body(basis_items) return _render_admin_page(request, "Basis Items", "Basis Items", body) @router.get("/basis-items/{basis_item_id}", include_in_schema=False) async def basis_item_workspace_view( basis_item_id: int, request: Request, db: AsyncSession = Depends(get_db), ): admin = await _current_admin(request) if not admin: return _login_redirect() status_filter = (request.query_params.get("status") or "").strip() level_filter = (request.query_params.get("level") or "").strip() run_id_filter = (request.query_params.get("run_id") or "").strip() min_frequency_filter = (request.query_params.get("min_frequency") or "").strip() filters = { "status": status_filter, "level": level_filter, "run_id": run_id_filter, "min_frequency": min_frequency_filter, } basis_item = await db.get(Item, basis_item_id) if basis_item is None or basis_item.generated_by == "ai" or basis_item.level != "sedang": result = await db.execute( select(Item) .where(Item.level == "sedang", Item.generated_by != "ai") .order_by(Item.updated_at.desc(), Item.id.desc()) .limit(200) ) body = _basis_items_list_body(list(result.scalars().all())) return _render_admin_page(request, "Basis Items", "Basis Items", body) run_result = await db.execute( select(AIGenerationRun) .where(AIGenerationRun.basis_item_id == basis_item.id) .order_by(AIGenerationRun.id.desc()) .limit(50) ) runs = list(run_result.scalars().all()) variant_result = await db.execute( select(Item).where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id) .order_by(Item.created_at.desc(), Item.id.desc()) .limit(300) ) variants_all = list(variant_result.scalars().all()) variants = variants_all if status_filter: variants = [item for item in variants if item.variant_status == status_filter] if level_filter in {"mudah", "sulit"}: variants = [item for item in variants if item.level == level_filter] if run_id_filter.isdigit(): rid = int(run_id_filter) variants = [item for item in variants if item.generation_run_id == rid] usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants) if min_frequency_filter: try: min_freq = float(min_frequency_filter) variants = [ item for item in variants if usage_metrics.get(item.id, {}).get("frequency", 0.0) >= min_freq ] except ValueError: pass body = _basis_item_workspace_body( basis_item, runs, variants, usage_metrics, family_stats, filters, ) return _render_admin_page(request, f"Basis Item #{basis_item.id}", f"Basis Item Workspace #{basis_item.id}", body, ) @router.post("/basis-items/{basis_item_id}/generate", include_in_schema=False) async def basis_item_generate_submit( basis_item_id: int, request: Request, db: AsyncSession = Depends(get_db), target_level: str = Form(...), ai_model: str = Form(""), generation_count: int = Form(1), operator_notes: str = Form(""), include_note_for_admin: str | None = Form(None), include_note_in_prompt: str | None = Form(None), ): admin = await _current_admin(request) if not admin: return _login_redirect() filters = {"status": "", "level": "", "run_id": "", "min_frequency": ""} basis_item = await db.get(Item, basis_item_id) if basis_item is None or basis_item.generated_by == "ai" or basis_item.level != "sedang": return RedirectResponse(url="/admin/basis-items", status_code=HTTP_303_SEE_OTHER) # Llama-only policy for production quality consistency. ai_model = settings.OPENROUTER_MODEL_LLAMA note_for_admin = include_note_for_admin == "on" note_in_prompt = include_note_in_prompt == "on" if not settings.OPENROUTER_API_KEY: run_result = await db.execute( select(AIGenerationRun) .where(AIGenerationRun.basis_item_id == basis_item.id) .order_by(AIGenerationRun.id.desc()) .limit(50) ) variant_result = await db.execute( select(Item) .where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id) .order_by(Item.created_at.desc(), Item.id.desc()) .limit(300) ) runs = list(run_result.scalars().all()) variants = list(variant_result.scalars().all()) usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants) body = _basis_item_workspace_body( basis_item, runs, variants, usage_metrics, family_stats, filters, error="OPENROUTER_API_KEY is not configured.", target_level=target_level, ai_model=ai_model, generation_count=str(generation_count), operator_notes=operator_notes, include_note_for_admin=note_for_admin, include_note_in_prompt=note_in_prompt, ) return _render_admin_page(request, f"Basis Item #{basis_item.id}", f"Basis Item Workspace #{basis_item.id}", body, ) if target_level not in {"mudah", "sulit"}: return RedirectResponse(url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER) if generation_count < 1 or generation_count > 50: return RedirectResponse(url=f"/admin/basis-items/{basis_item.id}", status_code=HTTP_303_SEE_OTHER) run_id = await create_generation_run( basis_item_id=basis_item.id, source_snapshot_question_id=basis_item.source_snapshot_question_id, target_level=target_level, requested_count=generation_count, model=ai_model, created_by=admin.username, operator_notes=(operator_notes.strip() or None) if note_for_admin else None, db=db, ) generated = await generate_questions_batch( basis_item=basis_item, target_level=target_level, ai_model=ai_model, count=generation_count, operator_notes=operator_notes if note_in_prompt else None, ) from app.schemas.ai import GeneratedQuestion saved = 0 for generated_question in generated: item_id = await save_ai_question( generated_data=GeneratedQuestion( stem=generated_question.stem, options=generated_question.options, correct=generated_question.correct, explanation=generated_question.explanation or None, ), tryout_id=basis_item.tryout_id, website_id=basis_item.website_id, basis_item_id=basis_item.id, slot=basis_item.slot, level=target_level, ai_model=ai_model, generation_run_id=run_id, source_snapshot_question_id=basis_item.source_snapshot_question_id, variant_status="draft", db=db, ) if item_id: saved += 1 await db.commit() run_result = await db.execute( select(AIGenerationRun) .where(AIGenerationRun.basis_item_id == basis_item.id) .order_by(AIGenerationRun.id.desc()) .limit(50) ) variant_result = await db.execute( select(Item) .where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id) .order_by(Item.created_at.desc(), Item.id.desc()) .limit(300) ) runs = list(run_result.scalars().all()) variants = list(variant_result.scalars().all()) usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants) status_message = ( f"Run #{run_id} failed to produce savable variants. " f"Requested={generation_count}, Generated={len(generated)}, Saved={saved}. " "Check model output/credentials and server logs." if saved == 0 else f"Run #{run_id} finished. Requested={generation_count}, Generated={len(generated)}, Saved={saved}." ) body = _basis_item_workspace_body( basis_item, runs, variants, usage_metrics, family_stats, filters, error=status_message if saved == 0 else None, success=status_message if saved > 0 else None, target_level=target_level, ai_model=ai_model, generation_count=str(generation_count), include_note_for_admin=note_for_admin, include_note_in_prompt=note_in_prompt, ) return _render_admin_page(request, f"Basis Item #{basis_item.id}", f"Basis Item Workspace #{basis_item.id}", body, ) @router.post("/basis-items/{basis_item_id}/review-bulk", include_in_schema=False) async def basis_item_review_bulk( basis_item_id: int, request: Request, db: AsyncSession = Depends(get_db), item_ids: list[int] = Form([]), action: str = Form(...), ): filters = {"status": "", "level": "", "run_id": "", "min_frequency": ""} admin = await _current_admin(request) if not admin: return _login_redirect() basis_item = await db.get(Item, basis_item_id) if basis_item is None: return RedirectResponse(url="/admin/basis-items", status_code=HTTP_303_SEE_OTHER) valid_actions = {"approved", "rejected", "archived", "stale", "active"} if action in valid_actions and item_ids: result = await db.execute( select(Item).where( Item.id.in_(item_ids), Item.generated_by == "ai", Item.basis_item_id == basis_item.id, ) ) items = list(result.scalars().all()) reviewed_at = datetime.now(timezone.utc) for item in items: item.variant_status = action item.reviewed_by = admin.username item.reviewed_at = reviewed_at await db.commit() run_result = await db.execute( select(AIGenerationRun) .where(AIGenerationRun.basis_item_id == basis_item.id) .order_by(AIGenerationRun.id.desc()) .limit(50) ) variant_result = await db.execute( select(Item) .where(Item.generated_by == "ai", Item.basis_item_id == basis_item.id) .order_by(Item.created_at.desc(), Item.id.desc()) .limit(300) ) runs = list(run_result.scalars().all()) variants = list(variant_result.scalars().all()) usage_metrics, family_stats = await _family_usage_stats(db, basis_item, variants) body = _basis_item_workspace_body( basis_item, runs, variants, usage_metrics, family_stats, filters, success=f"Applied status '{action}' to selected variants.", ) return _render_admin_page(request, f"Basis Item #{basis_item.id}", f"Basis Item Workspace #{basis_item.id}", body, ) def _ai_form_body( key_configured: bool, stats: dict[str, Any], error: str | None = None, success: str | None = None, generation_summary: dict[str, Any] | None = None, basis_items: list[Item] | None = None, generation_runs: list[AIGenerationRun] | None = None, generated_variants: list[Item] | None = None, basis_item_id: str = "", target_level: str = "mudah", ai_model: str = settings.OPENROUTER_MODEL_LLAMA, generation_count: str = "1", operator_notes: str = "", include_note_for_admin: bool = True, include_note_in_prompt: bool = False, ) -> str: error_html = f'
{escape(error)}
' if error else "" success_html = f'
{escape(success)}
' if success else "" basis_items = basis_items or [] generation_runs = generation_runs or [] generated_variants = generated_variants or [] basis_rows = [ [ item.id, item.tryout_id, item.slot, item.website_id, _truncate(item.stem, 90), ] for item in basis_items ] basis_table = _table( ["Item ID", "Tryout", "Slot", "Website", "Stem"], basis_rows, ) seed_callout = "" if not basis_items: seed_callout = """
No sedang basis items found yet. Seed one demo website, tryout, and basis item to test AI generation immediately.
""" summary_html = "" if generation_summary: saved_item_ids = generation_summary.get("saved_item_ids") or [] summary_html = f"""

Latest Generation Run

Run ID{generation_summary.get("run_id", "-")}
Requested{generation_summary.get("requested_count", 0)}
Generated{generation_summary.get("generated_count", 0)}
Saved{len(saved_item_ids)}

Each saved output starts as draft. Review per item below to approve, reject, archive, stale, or activate.

""" run_rows = [ [ run.id, run.basis_item_id, run.target_level, run.requested_count, run.model, run.created_by, str(run.created_at), ] for run in generation_runs ] runs_table = _table( ["Run ID", "Basis Item", "Target", "Requested", "Model", "Created By", "Created At"], run_rows, ) variant_rows = [] for item in generated_variants: variant_rows.append( "" f"" f"{item.id}" f"{item.generation_run_id or '-'}" f"{item.basis_item_id or '-'}" f"{escape(item.level)}" f"{escape(item.variant_status)}" f"{escape(item.ai_model or '-')}" f"{escape(_truncate(item.stem, 100))}" f"{escape(str(item.created_at))}" "" ) variants_table = ( "
" "
" "" "" "
" "" + ("".join(variant_rows) if variant_rows else "") + "
el.checked = this.checked)\">Item IDRun IDBasisLevelStatusModelStemCreated At
No AI-generated variants yet.
" ) return f"""

OpenRouter key configured: {"Yes" if key_configured else "No"}

Total AI-generated items: {stats.get("total_ai_items", 0)}

Hybrid workflow: one run can generate one or many variants; each item remains independently reviewable.

{success_html} {error_html} {seed_callout}

Recommended: 1-3 variants per run. Larger runs can increase overlap and review burden. Backend safety cap: 50.

Example note: Use simple wording, one-step arithmetic, avoid trick options.

Available Sedang Basis Items

The generator needs a sedang item. Use one of these IDs, or seed demo data if the table is empty.

{basis_table} {summary_html}

Recent Generation Runs

{runs_table}

Generated Variants (Review Queue)

Use bulk review actions to move items from draft to approved/active, or reject/archive/stale.

{variants_table} """ @router.get("/ai-playground", include_in_schema=False) async def ai_playground_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() stats = await get_ai_stats(db) basis_items = await _basis_items_for_playground(db) basis_item_id = request.query_params.get("basis_item_id", "") generation_runs = await _recent_generation_runs(db) selected_basis_item_id: int | None = None if basis_item_id and str(basis_item_id).isdigit(): selected_basis_item_id = int(str(basis_item_id)) generated_variants = await _recent_generated_variants( db, basis_item_id=selected_basis_item_id, ) body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, basis_items=basis_items, generation_runs=generation_runs, generated_variants=generated_variants, basis_item_id=str(basis_item_id or ""), ) return _render_admin_page(request, "AI Playground", "AI Playground", body) @router.post("/ai-playground/seed-demo", include_in_schema=False) async def ai_playground_seed_demo(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) if not admin: return _login_redirect() demo_item = await _find_or_create_demo_basis_item(db) stats = await get_ai_stats(db) basis_items = await _basis_items_for_playground(db) generation_runs = await _recent_generation_runs(db) generated_variants = await _recent_generated_variants(db) body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, success=f"Demo basis item is ready: item #{demo_item.id}, tryout {demo_item.tryout_id}, slot {demo_item.slot}.", basis_items=basis_items, generation_runs=generation_runs, generated_variants=generated_variants, basis_item_id=str(demo_item.id), ) return _render_admin_page(request, "AI Playground", "AI Playground", body) @router.post("/ai-playground", include_in_schema=False) async def ai_playground_submit( request: Request, db: AsyncSession = Depends(get_db), basis_item_id: int = Form(...), target_level: str = Form(...), ai_model: str = Form(""), generation_count: int = Form(1), operator_notes: str = Form(""), include_note_for_admin: str | None = Form(None), include_note_in_prompt: str | None = Form(None), ): admin = await _current_admin(request) if not admin: return _login_redirect() stats = await get_ai_stats(db) basis_items = await _basis_items_for_playground(db) generation_runs = await _recent_generation_runs(db) generated_variants = await _recent_generated_variants(db) # Llama-only policy for production quality consistency. ai_model = settings.OPENROUTER_MODEL_LLAMA note_for_admin = include_note_for_admin == "on" note_in_prompt = include_note_in_prompt == "on" if not settings.OPENROUTER_API_KEY: body = _ai_form_body( False, stats, error="OPENROUTER_API_KEY is not configured in the environment.", basis_items=basis_items, generation_runs=generation_runs, generated_variants=generated_variants, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, generation_count=str(generation_count), operator_notes=operator_notes, include_note_for_admin=note_for_admin, include_note_in_prompt=note_in_prompt, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) if target_level not in {"mudah", "sulit"}: body = _ai_form_body( True, stats, error="Target level must be mudah or sulit.", basis_items=basis_items, generation_runs=generation_runs, generated_variants=generated_variants, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, generation_count=str(generation_count), operator_notes=operator_notes, include_note_for_admin=note_for_admin, include_note_in_prompt=note_in_prompt, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) result = await db.execute(select(Item).where(Item.id == basis_item_id)) basis_item = result.scalar_one_or_none() if not basis_item: body = _ai_form_body( True, stats, error=f"Basis item not found: {basis_item_id}", basis_items=basis_items, generation_runs=generation_runs, generated_variants=generated_variants, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, generation_count=str(generation_count), operator_notes=operator_notes, include_note_for_admin=note_for_admin, include_note_in_prompt=note_in_prompt, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) if basis_item.level != "sedang": body = _ai_form_body( True, stats, error=f"Basis item must be sedang level, got: {basis_item.level}", basis_items=basis_items, generation_runs=generation_runs, generated_variants=generated_variants, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, generation_count=str(generation_count), operator_notes=operator_notes, include_note_for_admin=note_for_admin, include_note_in_prompt=note_in_prompt, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) if generation_count < 1 or generation_count > 50: body = _ai_form_body( True, stats, error="Generate count must be between 1 and 50.", basis_items=basis_items, generation_runs=generation_runs, generated_variants=generated_variants, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, generation_count=str(generation_count), operator_notes=operator_notes, include_note_for_admin=note_for_admin, include_note_in_prompt=note_in_prompt, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) run_id = await create_generation_run( basis_item_id=basis_item.id, source_snapshot_question_id=basis_item.source_snapshot_question_id, target_level=target_level, requested_count=generation_count, model=ai_model, created_by=admin.username, operator_notes=(operator_notes.strip() or None) if note_for_admin else None, db=db, ) generated = await generate_questions_batch( basis_item=basis_item, target_level=target_level, ai_model=ai_model, count=generation_count, operator_notes=operator_notes if note_in_prompt else None, ) saved_item_ids: list[int] = [] from app.schemas.ai import GeneratedQuestion for generated_question in generated: item_id = await save_ai_question( generated_data=GeneratedQuestion( stem=generated_question.stem, options=generated_question.options, correct=generated_question.correct, explanation=generated_question.explanation or None, ), tryout_id=basis_item.tryout_id, website_id=basis_item.website_id, basis_item_id=basis_item.id, slot=basis_item.slot, level=target_level, ai_model=ai_model, generation_run_id=run_id, source_snapshot_question_id=basis_item.source_snapshot_question_id, variant_status="draft", db=db, ) if item_id: saved_item_ids.append(item_id) await db.commit() updated_stats = await get_ai_stats(db) updated_basis_items = await _basis_items_for_playground(db) updated_runs = await _recent_generation_runs(db) updated_variants = await _recent_generated_variants(db) if not saved_item_ids: body = _ai_form_body( True, updated_stats, error="Generation run completed but no items were saved. Check model output and logs.", basis_items=updated_basis_items, generation_runs=updated_runs, generated_variants=updated_variants, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, generation_count=str(generation_count), operator_notes=operator_notes, include_note_for_admin=note_for_admin, include_note_in_prompt=note_in_prompt, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) body = _ai_form_body( True, updated_stats, success=f"Generation run #{run_id} completed. Saved {len(saved_item_ids)} item(s).", generation_summary={ "run_id": run_id, "requested_count": generation_count, "generated_count": len(generated), "saved_item_ids": saved_item_ids, }, basis_items=updated_basis_items, generation_runs=updated_runs, generated_variants=updated_variants, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, generation_count=str(generation_count), operator_notes=operator_notes, include_note_for_admin=note_for_admin, include_note_in_prompt=note_in_prompt, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) @router.post("/ai-playground/save", include_in_schema=False) async def ai_playground_save( request: Request, db: AsyncSession = Depends(get_db), basis_item_id: int = Form(...), tryout_id: str = Form(...), website_id: int = Form(...), slot: int = Form(...), target_level: str = Form(...), ai_model: str = Form(...), stem: str = Form(...), options_json: str = Form(...), correct: str = Form(...), explanation: str = Form(""), ): admin = await _current_admin(request) if not admin: return _login_redirect() stats = await get_ai_stats(db) basis_items = await _basis_items_for_playground(db) if target_level not in {"mudah", "sulit"}: body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, error="Only mudah or sulit generated items can be saved from the playground.", basis_items=basis_items, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) try: options = json.loads(options_json) except json.JSONDecodeError: body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, error="Generated options payload is invalid.", basis_items=basis_items, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) from app.schemas.ai import GeneratedQuestion item_id = await save_ai_question( generated_data=GeneratedQuestion( stem=stem, options=options, correct=correct, explanation=explanation or None, ), tryout_id=tryout_id, website_id=website_id, basis_item_id=basis_item_id, slot=slot, level=target_level, ai_model=ai_model, variant_status="draft", db=db, ) if not item_id: body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, error="Failed to save generated item. Check server logs for the database error.", basis_items=basis_items, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) await db.commit() updated_stats = await get_ai_stats(db) updated_basis_items = await _basis_items_for_playground(db) body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), updated_stats, success=f"Generated item saved successfully as item #{item_id}.", basis_items=updated_basis_items, basis_item_id=str(basis_item_id), target_level=target_level, ai_model=ai_model, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) @router.post("/ai-playground/review-bulk", include_in_schema=False) async def ai_playground_review_bulk( request: Request, db: AsyncSession = Depends(get_db), item_ids: list[int] = Form([]), action: str = Form(...), ): admin = await _current_admin(request) if not admin: return _login_redirect() valid_actions = {"approved", "rejected", "archived", "stale", "active"} stats = await get_ai_stats(db) basis_items = await _basis_items_for_playground(db) generation_runs = await _recent_generation_runs(db) generated_variants = await _recent_generated_variants(db) if action not in valid_actions: body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, error="Invalid review action.", basis_items=basis_items, generation_runs=generation_runs, generated_variants=generated_variants, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) if not item_ids: body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, error="Select at least one generated item.", basis_items=basis_items, generation_runs=generation_runs, generated_variants=generated_variants, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) result = await db.execute( select(Item).where(Item.id.in_(item_ids), Item.generated_by == "ai") ) items = list(result.scalars().all()) if not items: body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), stats, error="No matching AI-generated items were found for the selected IDs.", basis_items=basis_items, generation_runs=generation_runs, generated_variants=generated_variants, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) reviewed_at = datetime.now(timezone.utc) for item in items: item.variant_status = action item.reviewed_by = admin.username item.reviewed_at = reviewed_at await db.commit() updated_stats = await get_ai_stats(db) updated_basis_items = await _basis_items_for_playground(db) updated_runs = await _recent_generation_runs(db) updated_variants = await _recent_generated_variants(db) body = _ai_form_body( bool(settings.OPENROUTER_API_KEY), updated_stats, success=f"Updated {len(items)} item(s) to status '{action}'.", basis_items=updated_basis_items, generation_runs=updated_runs, generated_variants=updated_variants, ) return _render_admin_page(request, "AI Playground", "AI Playground", body) @router.get("/tryout/list", include_in_schema=False) @router.get("/item/list", include_in_schema=False) @router.get("/user/list", include_in_schema=False) @router.get("/session/list", include_in_schema=False) @router.get("/tryoutstats/list", include_in_schema=False) async def legacy_admin_paths(request: Request): admin = await _current_admin(request) if not admin: return _login_redirect() return _dashboard_redirect()