' if success else ""
display_name = website.site_name if site_name is None else site_name
display_url = website.site_url if site_url is None else site_url
return f"""
Website ID: {website.id}
{success_html}
{error_html}
"""
async def _basis_items_for_playground(db: AsyncSession, limit: int = 20) -> list[Item]:
result = await db.execute(
select(Item)
.where(Item.level == "sedang")
.order_by(Item.created_at.desc(), Item.id.desc())
.limit(limit)
)
return list(result.scalars().all())
async def _find_or_create_demo_basis_item(db: AsyncSession) -> Item:
result = await db.execute(
select(Item)
.where(
Item.level == "sedang",
Item.generated_by == "manual",
Item.tryout_id == "demo-tryout",
)
.order_by(Item.id.asc())
.limit(1)
)
existing_item = result.scalar_one_or_none()
if existing_item:
return existing_item
website_result = await db.execute(
select(Website).where(Website.site_url == "https://demo.local").limit(1)
)
website = website_result.scalar_one_or_none()
if website is None:
website = Website(site_url="https://demo.local", site_name="Demo Website")
db.add(website)
await db.flush()
tryout_result = await db.execute(
select(Tryout)
.where(Tryout.website_id == website.id, Tryout.tryout_id == "demo-tryout")
.limit(1)
)
tryout = tryout_result.scalar_one_or_none()
if tryout is None:
tryout = Tryout(
website_id=website.id,
tryout_id="demo-tryout",
name="Demo AI Playground Tryout",
description="Seed data for the AI playground.",
scoring_mode="ctt",
selection_mode="fixed",
normalization_mode="static",
ai_generation_enabled=True,
)
db.add(tryout)
await db.flush()
item = Item(
tryout_id=tryout.tryout_id,
website_id=website.id,
slot=1,
level="sedang",
stem="Sebuah toko memberi diskon 20% untuk sebuah tas. Jika harga setelah diskon adalah Rp240.000, berapakah harga tas sebelum diskon?",
options={
"A": "Rp260.000",
"B": "Rp300.000",
"C": "Rp320.000",
"D": "Rp360.000",
},
correct_answer="B",
explanation="Harga setelah diskon 20% berarti 80% dari harga awal. Jadi harga awal = 240.000 / 0,8 = 300.000.",
generated_by="manual",
calibrated=False,
calibration_sample_size=0,
)
db.add(item)
await db.flush()
await db.commit()
await db.refresh(item)
return item
@router.get("", include_in_schema=False)
@router.get("/", include_in_schema=False)
async def admin_root(request: Request):
admin = await _current_admin(request)
if admin:
return _dashboard_redirect()
return _login_redirect()
@router.get("/login", include_in_schema=False)
async def login_view(request: Request):
admin = await _current_admin(request)
if admin:
return _dashboard_redirect()
body = """
Direct environment-backed admin access.
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
)
@router.post("/login", include_in_schema=False)
async def login_submit(
request: Request,
username: str = Form(...),
password: str = Form(...),
remember_me: str | None = Form(None),
):
if not (
secrets.compare_digest(username, settings.ADMIN_USERNAME)
and secrets.compare_digest(password, settings.ADMIN_PASSWORD)
):
body = f"""
Invalid username or password.
"""
return _render_auth_page(
request,
"Admin Login",
"Use the configured admin credentials to access the dashboard.",
body,
status_code=HTTP_401_UNAUTHORIZED,
)
expire = settings.ADMIN_SESSION_EXPIRE_SECONDS
response = _dashboard_redirect()
if remember_me == "on":
expire = max(expire, 3600 * 24 * 30)
response.set_cookie("remember_me", "on", expires=expire, path="/admin")
else:
response.delete_cookie("remember_me", path="/admin")
token = uuid.uuid4().hex
response.set_cookie(
SESSION_COOKIE,
token,
expires=expire,
path="/admin",
httponly=True,
samesite="lax",
)
await _admin_redis.set(f"{SESSION_PREFIX}{token}", settings.ADMIN_USERNAME, ex=expire)
return response
@router.get("/logout", include_in_schema=False)
async def logout(request: Request):
token = request.cookies.get(SESSION_COOKIE)
if token and _admin_redis is not None:
await _admin_redis.delete(f"{SESSION_PREFIX}{token}")
response = _login_redirect()
response.delete_cookie(SESSION_COOKIE, path="/admin")
response.delete_cookie("remember_me", path="/admin")
return response
@router.get("/password", include_in_schema=False)
async def password_view(request: Request):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
body = f"""
Signed in as {escape(admin.username)}.
Password changes are disabled in the UI for this deployment.
Update ADMIN_PASSWORD in the server environment, then restart the app.
Session expiry is currently set to {settings.ADMIN_SESSION_EXPIRE_SECONDS} seconds.
"""
return _render_admin_page("IRT Bank Soal Admin", "Dashboard", body)
@router.get("/websites", include_in_schema=False)
async def websites_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites)
return _render_admin_page("Websites", "Websites", body)
@router.post("/websites", include_in_schema=False)
async def websites_submit(
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
if not normalized_url.startswith(("http://", "https://")):
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
website = Website(site_name=normalized_name, site_url=normalized_url)
db.add(website)
try:
await db.commit()
except IntegrityError:
await db.rollback()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Websites", "Websites", body)
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website added successfully with ID {website.id}.",
)
return _render_admin_page("Websites", "Websites", body)
@router.get("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_view(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
body = _website_edit_form_body(website)
return _render_admin_page("Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/edit", include_in_schema=False)
async def website_edit_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
site_name: str = Form(...),
site_url: str = Form(...),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
normalized_name = site_name.strip()
normalized_url = site_url.strip().rstrip("/")
if not normalized_name:
body = _website_edit_form_body(
website,
error="Website name is required.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
if not normalized_url.startswith(("http://", "https://")):
body = _website_edit_form_body(
website,
error="Website URL must start with http:// or https://.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
website.site_name = normalized_name
website.site_url = normalized_url
try:
await db.commit()
except IntegrityError:
await db.rollback()
body = _website_edit_form_body(
website,
error="Website URL already exists.",
site_name=site_name,
site_url=site_url,
)
return _render_admin_page("Edit Website", "Edit Website", body)
await db.refresh(website)
body = _website_edit_form_body(
website,
success=f"Website #{website.id} updated successfully.",
)
return _render_admin_page("Edit Website", "Edit Website", body)
@router.post("/websites/{website_id}/delete", include_in_schema=False)
async def website_delete_submit(
website_id: int,
request: Request,
db: AsyncSession = Depends(get_db),
):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
website = await db.get(Website, website_id)
if website is None:
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(websites, error=f"Website not found: {website_id}")
return _render_admin_page("Websites", "Websites", body)
deleted_label = f"{website.site_name} ({website.site_url})"
await db.delete(website)
await db.commit()
result = await db.execute(select(Website).order_by(Website.id.asc()))
websites = list(result.scalars().all())
body = _websites_form_body(
websites,
success=f"Website deleted successfully: {deleted_label}",
)
return _render_admin_page("Websites", "Websites", body)
@router.get("/calibration-status", include_in_schema=False)
async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Tryout.tryout_id, Tryout.name, Tryout.website_id).order_by(Tryout.id))
tryouts = result.all()
rows = []
for tryout_id, name, website_id in tryouts:
status = await get_calibration_status(tryout_id, website_id, db)
rows.append(
[
tryout_id,
name,
status["total_items"],
status["calibrated_items"],
f'{status["calibration_percentage"]:.2f}%',
"Yes" if status["ready_for_irt"] else "No",
]
)
body = _table(
["Tryout ID", "Name", "Total Items", "Calibrated", "Calibration %", "Ready for IRT"],
rows,
)
return _render_admin_page("Calibration Status", "Calibration Status", body)
@router.get("/item-statistics", include_in_schema=False)
async def item_statistics_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Item.level).distinct())
levels = result.scalars().all()
rows = []
for level in levels:
item_result = await db.execute(select(Item).where(Item.level == level).order_by(Item.slot).limit(10))
items = item_result.scalars().all()
total_responses = sum(item.calibration_sample_size or 0 for item in items)
calibrated_count = sum(1 for item in items if item.calibrated)
calibration_percentage = (calibrated_count / len(items) * 100) if items else 0
avg_correctness = sum(item.ctt_p or 0 for item in items) / len(items) if items else 0
rows.append(
[
level,
len(items),
calibrated_count,
f"{calibration_percentage:.2f}%",
total_responses,
f"{avg_correctness:.4f}",
]
)
body = _table(
["Level", "Total Items", "Calibrated", "Calibration %", "Responses", "Avg Correctness"],
rows,
)
return _render_admin_page("Item Statistics", "Item Statistics", body)
@router.get("/session-overview", include_in_schema=False)
async def session_overview_view(request: Request, db: AsyncSession = Depends(get_db)):
admin = await _current_admin(request)
if not admin:
return _login_redirect()
result = await db.execute(select(Session).order_by(Session.created_at.desc()).limit(50))
sessions = result.scalars().all()
rows = [
[
session.session_id,
session.wp_user_id,
session.tryout_id,
"Yes" if session.is_completed else "No",
session.scoring_mode_used,
session.total_benar,
session.NM,
session.NN,
session.theta,
]
for session in sessions
]
body = _table(
["Session ID", "WP User", "Tryout", "Completed", "Mode", "Benar", "NM", "NN", "Theta"],
rows,
)
return _render_admin_page("Session Overview", "Session Overview", body)
def _ai_form_body(
key_configured: bool,
stats: dict[str, Any],
error: str | None = None,
success: str | None = None,
result: dict[str, Any] | None = None,
basis_items: list[Item] | None = None,
basis_item_id: str = "",
target_level: str = "mudah",
ai_model: str = settings.OPENROUTER_MODEL_QWEN,
) -> str:
error_html = f'
{escape(error)}
' if error else ""
success_html = f'
{escape(success)}
' if success else ""
options_html = "".join(
f''
for model, label in SUPPORTED_MODELS.items()
)
basis_items = basis_items or []
basis_rows = [
[
item.id,
item.tryout_id,
item.slot,
item.website_id,
_truncate(item.stem, 90),
]
for item in basis_items
]
basis_table = _table(
["Item ID", "Tryout", "Slot", "Website", "Stem"],
basis_rows,
)
seed_callout = ""
if not basis_items:
seed_callout = """
No sedang basis items found yet. Seed one demo website, tryout, and basis item to test AI generation immediately.
"""
result_html = ""
if result:
options = result.get("options") or {}
save_html = ""
if result.get("basis_item_id") and not result.get("existing_item_id"):
save_html = f"""
"""
elif result.get("existing_item_id"):
save_html = f"""
Slot {escape(str(result.get("slot", "")))} already has a {escape(str(result.get("target_level", "")))} item
for this tryout. Existing item ID: {escape(str(result.get("existing_item_id")))}.