' if success else ""
display_name = website.site_name if site_name is None else site_name
display_url = website.site_url if site_url is None else site_url
return f"""
"""
return _render_admin_page("IRT Bank Soal Admin", "Dashboard", body)
@router.get("/websites", include_in_schema=False)
async def websites_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites)
return _render_admin_page("Websites", "Websites", body)
@router.post("/websites", include_in_schema=False)
async def websites_submit(
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
if not normalized_url.startswith(("http://", "https://")):
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
website = Website(site_name=normalized_name, site_url=normalized_url)
db.add(website)
try:
await db.commit()
except IntegrityError:
await db.rollback()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website added successfully with ID {website.id}.",
)
return _render_admin_page("Websites", "Websites", body)
@router.get("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_view(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
body = _website_edit_form_body(website)
return _render_admin_page("Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
body = _website_edit_form_body(
website,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
if not normalized_url.startswith(("http://", "https://")):
body = _website_edit_form_body(
website,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
website.site_name = normalized_name
website.site_url = normalized_url
try:
await db.commit()
except IntegrityError:
await db.rollback()
body = _website_edit_form_body(
website,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
await db.refresh(website)
body = _website_edit_form_body(
website,
success=f"Website #{website.id} updated successfully.",
)
return _render_admin_page("Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/delete", include_in_schema=False)
async def website_delete_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
deleted_label = f"{website.site_name} ({website.site_url})"
await db.delete(website)
await db.commit()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website deleted successfully: {deleted_label}",
)
return _render_admin_page("Websites", "Websites", body)
@router.get("/tryout-import", include_in_schema=False)
async def tryout_import_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
body = _tryout_import_form_body(websites, snapshots)
return _render_admin_page("Tryout Import", "Tryout Import", body)
@router.post("/tryout-import/preview", include_in_schema=False)
async def tryout_import_preview(
request: Request,
db: AsyncSession = Depends(get_db),
website_id: int = Form(...),
file: UploadFile = File(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
if not file.filename or not file.filename.lower().endswith(".json"):
body = _tryout_import_form_body(
websites,
snapshots,
error="File must be .json format.",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
try:
payload_bytes = await file.read()
payload_text = payload_bytes.decode("utf-8")
payload = json.loads(payload_text)
except UnicodeDecodeError:
body = _tryout_import_form_body(
websites,
snapshots,
error="File must be UTF-8 encoded JSON.",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
except json.JSONDecodeError as exc:
body = _tryout_import_form_body(
websites,
snapshots,
error=f"Invalid JSON file: {exc}",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
try:
preview = await preview_tryout_json_import(payload, website_id, db)
except TryoutImportError as exc:
body = _tryout_import_form_body(
websites,
snapshots,
error=str(exc),
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
preview_token = uuid.uuid4().hex
await _admin_redis.set(
f"{IMPORT_PREVIEW_PREFIX}{preview_token}",
payload_text,
ex=IMPORT_PREVIEW_TTL_SECONDS,
)
body = _tryout_import_form_body(
websites,
snapshots,
selected_website_id=website_id,
preview=preview,
preview_token=preview_token,
upload_filename=file.filename or "",
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
@router.post("/tryout-import", include_in_schema=False)
async def tryout_import_submit(
request: Request,
db: AsyncSession = Depends(get_db),
website_id: int = Form(...),
preview_token: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
websites = await _load_websites(db)
snapshots = await _recent_snapshots(db)
payload_text = await _admin_redis.get(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
if not payload_text:
body = _tryout_import_form_body(
websites,
snapshots,
error="Preview token expired. Upload the JSON again and preview before importing.",
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
try:
payload = json.loads(payload_text)
result = await import_tryout_json_snapshot(payload, website_id, db)
await db.commit()
except TryoutImportError as exc:
await db.rollback()
body = _tryout_import_form_body(
websites,
snapshots,
error=str(exc),
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
except Exception:
await db.rollback()
raise
finally:
await _admin_redis.delete(f"{IMPORT_PREVIEW_PREFIX}{preview_token}")
updated_snapshots = await _recent_snapshots(db)
imported_tryouts = result.get("imported_tryouts") or []
imported_count = sum((row.get("question_count") or 0) for row in imported_tryouts)
body = _tryout_import_form_body(
websites,
updated_snapshots,
success=(
f"Imported {len(imported_tryouts)} tryout snapshot(s) and archived {imported_count} source question reference row(s)."
),
selected_website_id=website_id,
)
return _render_admin_page("Tryout Import", "Tryout Import", body)
@router.get("/calibration-status", include_in_schema=False)
async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id))
tryouts = result.all()
rows = []
for tryout_id, name, website_id in tryouts:
status = await get_calibration_status(tryout_id, website_id, db)
rows.append(
[
tryout_id,
name,
status["total_items"],
status["calibrated_items"],
f'{status["calibration_percentage"]:.2f}%',
"Yes" if status["ready_for_irt"] else "No",
]
)
body = _table(
["Tryout ID", "Name", "Total Items", "Calibrated", "Calibration %", "Ready for IRT"],
rows,
)
return _render_admin_page("Calibration Status", "Calibration Status", body)
@router.get("/item-statistics", include_in_schema=False)
async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Item.level).distinct())
levels = result.scalars().all()
rows = []
for level in levels:
item_result = await db.execute(select(Item).where(Item.level == level).order_by(Item.slot).limit(10))
items = item_result.scalars().all()
total_responses = sum(item.calibration_sample_size or 0 for item in items)
calibrated_count = sum(1 for item in items if item.calibrated)
calibration_percentage = (calibrated_count / len(items) * 100) if items else 0
avg_correctness = sum(item.ctt_p or 0 for item in items) / len(items) if items else 0
rows.append(
[
level,
len(items),
calibrated_count,
f"{calibration_percentage:.2f}%",
total_responses,
f"{avg_correctness:.4f}",
]
)
body = _table(
["Level", "Total Items", "Calibrated", "Calibration %", "Responses", "Avg Correctness"],
rows,
)
return _render_admin_page("Item Statistics", "Item Statistics", body)
@router.get("/session-overview", include_in_schema=False)
async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50))
sessions = result.scalars().all()
rows = [
[
session.session_id,
session.wp_user_id,
session.tryout_id,
"Yes" if session.is_completed else "No",
session.scoring_mode_used,
session.total_benar,
session.NM,
session.NN,
session.theta,
]
for session in sessions
]
body = _table(
["Session ID", "WP User", "Tryout", "Completed", "Mode", "Benar", "NM", "NN", "Theta"],
rows,
)
return _render_admin_page("Session Overview", "Session Overview", body)
def _ai_form_body(
key_configured: bool,
stats: dict[str, Any],
error: str | None = None,
success: str | None = None,
result: dict[str, Any] | None = None,
basis_items: list[Item] | None = None,
basis_item_id: str = "",
target_level: str = "mudah",
ai_model: str = settings.OPENROUTER_MODEL_QWEN,
) -> str:
error_html = f'
{escape(error)}
' if error else ""
success_html = f'
{escape(success)}
' if success else ""
options_html = "".join(
f''
for model, label in SUPPORTED_MODELS.items()
)
basis_items = basis_items or []
basis_rows = [
[
item.id,
item.tryout_id,
item.slot,
item.website_id,
_truncate(item.stem, 90),
]
for item in basis_items
]
basis_table = _table(
["Item ID", "Tryout", "Slot", "Website", "Stem"],
basis_rows,
)
seed_callout = ""
if not basis_items:
seed_callout = """
No sedang basis items found yet. Seed one demo website, tryout, and basis item to test AI generation immediately.
"""
result_html = ""
if result:
options = result.get("options") or {}
save_html = ""
if result.get("basis_item_id") and not result.get("existing_item_id"):
save_html = f"""
"""
elif result.get("existing_item_id"):
save_html = f"""
Slot {escape(str(result.get("slot", "")))} already has a {escape(str(result.get("target_level", "")))} item
for this tryout. Existing item ID: {escape(str(result.get("existing_item_id")))}.