"""
return _render_admin_page("IRT Bank Soal Admin", "Dashboard", body)
@router.get("/websites", include_in_schema=False)
async def websites_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites)
return _render_admin_page("Websites", "Websites", body)
@router.post("/websites", include_in_schema=False)
async def websites_submit(
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
if not normalized_url.startswith(("http://", "https://")):
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
website = Website(site_name=normalized_name, site_url=normalized_url)
db.add(website)
try:
await db.commit()
except IntegrityError:
await db.rollback()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website added successfully with ID {website.id}.",
)
return _render_admin_page("Websites", "Websites", body)
@router.get("/calibration-status", include_in_schema=False)
async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id))
tryouts = result.all()
rows = []
for tryout_id, name, website_id in tryouts:
status = await get_calibration_status(tryout_id, website_id, db)
rows.append(
[
tryout_id,
name,
status["total_items"],
status["calibrated_items"],
f'{status["calibration_percentage"]:.2f}%',
"Yes" if status["ready_for_irt"] else "No",
]
)
body = _table(
["Tryout ID", "Name", "Total Items", "Calibrated", "Calibration %", "Ready for IRT"],
rows,
)
return _render_admin_page("Calibration Status", "Calibration Status", body)
@router.get("/item-statistics", include_in_schema=False)
async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Item.level).distinct())
levels = result.scalars().all()
rows = []
for level in levels:
item_result = await db.execute(select(Item).where(Item.level == level).order_by(Item.slot).limit(10))
items = item_result.scalars().all()
total_responses = sum(item.calibration_sample_size or 0 for item in items)
calibrated_count = sum(1 for item in items if item.calibrated)
calibration_percentage = (calibrated_count / len(items) * 100) if items else 0
avg_correctness = sum(item.ctt_p or 0 for item in items) / len(items) if items else 0
rows.append(
[
level,
len(items),
calibrated_count,
f"{calibration_percentage:.2f}%",
total_responses,
f"{avg_correctness:.4f}",
]
)
body = _table(
["Level", "Total Items", "Calibrated", "Calibration %", "Responses", "Avg Correctness"],
rows,
)
return _render_admin_page("Item Statistics", "Item Statistics", body)
@router.get("/session-overview", include_in_schema=False)
async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50))
sessions = result.scalars().all()
rows = [
[
session.session_id,
session.wp_user_id,
session.tryout_id,
"Yes" if session.is_completed else "No",
session.scoring_mode_used,
session.total_benar,
session.NM,
session.NN,
session.theta,
]
for session in sessions
]
body = _table(
["Session ID", "WP User", "Tryout", "Completed", "Mode", "Benar", "NM", "NN", "Theta"],
rows,
)
return _render_admin_page("Session Overview", "Session Overview", body)
def _ai_form_body(
key_configured: bool,
stats: dict[str, Any],
error: str | None = None,
success: str | None = None,
result: dict[str, Any] | None = None,
basis_items: list[Item] | None = None,
basis_item_id: str = "",
target_level: str = "mudah",
ai_model: str = settings.OPENROUTER_MODEL_QWEN,
) -> str:
error_html = f'
{escape(error)}
' if error else ""
success_html = f'
{escape(success)}
' if success else ""
options_html = "".join(
f''
for model, label in SUPPORTED_MODELS.items()
)
basis_items = basis_items or []
basis_rows = [
[
item.id,
item.tryout_id,
item.slot,
item.website_id,
_truncate(item.stem, 90),
]
for item in basis_items
]
basis_table = _table(
["Item ID", "Tryout", "Slot", "Website", "Stem"],
basis_rows,
)
seed_callout = ""
if not basis_items:
seed_callout = """
No sedang basis items found yet. Seed one demo website, tryout, and basis item to test AI generation immediately.
"""
result_html = ""
if result:
options = result.get("options") or {}
save_html = ""
if result.get("basis_item_id") and not result.get("existing_item_id"):
save_html = f"""
"""
elif result.get("existing_item_id"):
save_html = f"""
Slot {escape(str(result.get("slot", "")))} already has a {escape(str(result.get("target_level", "")))} item
for this tryout. Existing item ID: {escape(str(result.get("existing_item_id")))}.