diff --git a/app/admin_web.py b/app/admin_web.py index e210d25..b916784 100644 --- a/app/admin_web.py +++ b/app/admin_web.py @@ -118,6 +118,45 @@ def _dashboard_redirect() -> RedirectResponse: return RedirectResponse(url="/admin/dashboard", status_code=HTTP_303_SEE_OTHER) +ADMIN_NAV_ITEMS = ( + ("Dashboard", "/admin/dashboard", ("/admin/dashboard",)), + ("Websites", "/admin/websites", ("/admin/websites",)), + ("Tryout Import", "/admin/tryout-import", ("/admin/tryout-import", "/admin/snapshot-questions")), + ("Data Hierarchy", "/admin/hierarchy", ("/admin/hierarchy",)), + ("Basis Items", "/admin/basis-items", ("/admin/basis-items",)), + ("Calibration Status", "/admin/calibration-status", ("/admin/calibration-status",)), + ("Item Statistics", "/admin/item-statistics", ("/admin/item-statistics",)), + ("Session Overview", "/admin/session-overview", ("/admin/session-overview",)), + ("AI Playground", "/admin/ai-playground", ("/admin/ai-playground",)), + ("Password Info", "/admin/password", ("/admin/password",)), + ("Logout", "/admin/logout", ("/admin/logout",)), +) + + +def _is_admin_nav_active( + current_path: str, + nav_path: str, + child_prefixes: tuple[str, ...], +) -> bool: + if current_path == nav_path: + return True + for prefix in child_prefixes: + if current_path == prefix or current_path.startswith(f"{prefix}/"): + return True + return False + + +def _admin_nav_links(request: Request) -> str: + current_path = request.url.path + links = [] + for label, path, child_prefixes in ADMIN_NAV_ITEMS: + active = _is_admin_nav_active(current_path, path, child_prefixes) + class_attr = ' class="active"' if active else "" + aria = ' aria-current="page"' if active else "" + links.append(f'{escape(label)}') + return "\n ".join(links) + + def _render_auth_page( request: Request, title: str, @@ -176,6 +215,7 @@ def _render_auth_page( def _render_admin_page(request: Request, title: str, page_title: str, body: str) -> HTMLResponse: + sidebar_links = _admin_nav_links(request) html = f""" @@ -189,6 +229,7 @@ def _render_admin_page(request: Request, title: str, page_title: str, body: str) .sidebar h1 {{ font-size: 18px; margin: 0 0 24px; }} .sidebar a {{ display: block; color: #cbd5e1; text-decoration: none; padding: 10px 12px; border-radius: 8px; margin-bottom: 8px; }} .sidebar a:hover {{ background: #1e293b; color: #fff; }} + .sidebar a.active {{ background: #e2e8f0; color: #0f172a; font-weight: 800; }} .content {{ padding: 32px; }} .card {{ background: #fff; border-radius: 14px; padding: 24px; box-shadow: 0 8px 30px rgba(15, 23, 42, 0.08); }} .grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 16px; margin-top: 20px; }} @@ -232,6 +273,16 @@ def _render_admin_page(request: Request, title: str, page_title: str, body: str) .question-block h3 {{ margin-top: 0; }} .option-key {{ width: 56px; font-weight: 800; color: #0f172a; }} .correct-option td {{ background: #ecfdf5; color: #166534; font-weight: 700; }} + .flow-strip {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); gap: 10px; margin: 16px 0 22px; }} + .flow-step {{ border: 1px solid #cbd5e1; border-radius: 8px; padding: 12px; background: #f8fafc; }} + .flow-step span {{ display: block; color: #64748b; font-size: 12px; font-weight: 800; text-transform: uppercase; }} + .flow-step strong {{ display: block; margin-top: 4px; }} + .hierarchy-website {{ border: 1px solid #e2e8f0; border-radius: 8px; padding: 18px; margin-top: 18px; }} + .hierarchy-group {{ border-left: 3px solid #cbd5e1; padding-left: 16px; margin-top: 16px; }} + .hierarchy-item {{ border: 1px solid #e2e8f0; border-radius: 8px; padding: 12px; margin-top: 12px; background: #fff; }} + .badge {{ display: inline-flex; align-items: center; min-height: 22px; padding: 0 8px; border-radius: 999px; background: #e2e8f0; color: #334155; font-size: 12px; font-weight: 800; }} + .attention-list {{ margin: 10px 0 0; padding-left: 18px; color: #334155; }} + .attention-list li {{ margin: 6px 0; }} @media (max-width: 860px) {{ .layout {{ grid-template-columns: 1fr; }} .sidebar {{ position: static; }} @@ -244,16 +295,7 @@ def _render_admin_page(request: Request, title: str, page_title: str, body: str)
@@ -637,6 +679,304 @@ async def _recent_generated_variants( return list(result.scalars().all()) +async def _load_hierarchy_context(db: AsyncSession) -> dict[str, list[Any]]: + website_result = await db.execute(select(Website).order_by(Website.id.asc())) + snapshot_result = await db.execute( + select(TryoutImportSnapshot).order_by( + TryoutImportSnapshot.website_id.asc(), + TryoutImportSnapshot.source_tryout_id.asc(), + TryoutImportSnapshot.id.desc(), + ) + ) + question_result = await db.execute( + select(TryoutSnapshotQuestion).order_by( + TryoutSnapshotQuestion.website_id.asc(), + TryoutSnapshotQuestion.source_tryout_id.asc(), + TryoutSnapshotQuestion.source_question_id.asc(), + ) + ) + basis_result = await db.execute( + select(Item) + .where(Item.generated_by != "ai", Item.level == "sedang") + .order_by(Item.website_id.asc(), Item.tryout_id.asc(), Item.slot.asc(), Item.id.asc()) + ) + variant_result = await db.execute( + select(Item) + .where(Item.generated_by == "ai") + .order_by(Item.website_id.asc(), Item.basis_item_id.asc(), Item.id.desc()) + ) + run_result = await db.execute( + select(AIGenerationRun).order_by( + AIGenerationRun.basis_item_id.asc(), + AIGenerationRun.id.desc(), + ) + ) + return { + "websites": list(website_result.scalars().all()), + "snapshots": list(snapshot_result.scalars().all()), + "questions": list(question_result.scalars().all()), + "basis_items": list(basis_result.scalars().all()), + "variants": list(variant_result.scalars().all()), + "runs": list(run_result.scalars().all()), + } + + +def _append_grouped(grouped: dict[Any, list[Any]], key: Any, value: Any) -> None: + grouped.setdefault(key, []).append(value) + + +def _variant_status_counts_html(variants: list[Item]) -> str: + if not variants: + return 'No variants' + counts: dict[str, int] = {} + for variant in variants: + counts[variant.variant_status] = counts.get(variant.variant_status, 0) + 1 + return " ".join( + f"{_status_pill(status)} {count}" + for status, count in sorted(counts.items()) + ) + + +def _hierarchy_flow_strip() -> str: + steps = ( + ("1", "Website", "Owner/source site"), + ("2", "Snapshot", "Imported tryout export"), + ("3", "Source Question", "Read-only imported question"), + ("4", "Basis Item", "Promoted sedang parent"), + ("5", "Run", "AI generation request"), + ("6", "Variant", "Generated child question"), + ) + return ( + '
' + + "".join( + f'
{step}{escape(title)}

{escape(copy)}

' + for step, title, copy in steps + ) + + "
" + ) + + +def _hierarchy_attention_html( + snapshots_without_basis: list[TryoutImportSnapshot], + basis_without_variants: list[Item], + variants_without_basis: list[Item], + basis_missing_source: list[Item], +) -> str: + rows = [] + for snapshot in snapshots_without_basis: + rows.append( + f'
  • Snapshot {escape(snapshot.title)} ' + f'has no promoted basis items yet. ' + f'Open snapshot questions
  • ' + ) + for item in basis_without_variants: + rows.append( + f'
  • Basis Item #{item.id} has no generated variants yet. ' + f'Open workspace
  • ' + ) + for item in variants_without_basis: + rows.append( + f'
  • Variant #{item.id} is not linked to an existing basis item. ' + f'View variant
  • ' + ) + for item in basis_missing_source: + rows.append( + f'
  • Basis Item #{item.id} is missing a source snapshot question reference. ' + f'Open workspace
  • ' + ) + + if not rows: + return """ +
    No hierarchy gaps detected in the current data.
    + """ + + return f""" +
    +

    Needs Attention

    +
      {"".join(rows)}
    +
    + """ + + +def _basis_hierarchy_item_html( + basis_item: Item, + source_question: TryoutSnapshotQuestion | None, + variants: list[Item], + runs: list[AIGenerationRun], +) -> str: + latest_run = runs[0] if runs else None + latest_variant_links = [] + for variant in variants[:3]: + latest_variant_links.append( + f'' + f'Variant #{variant.id}' + ) + source_label = "-" + if source_question is not None: + source_label = ( + f"{escape(source_question.source_question_id)} | " + f"{escape(_truncate(_html_to_text(source_question.question_title or source_question.question_html), 120))}" + ) + run_html = "-" + if latest_run is not None: + run_html = ( + f'Run #{latest_run.id} | {escape(latest_run.target_level)} | ' + f'{latest_run.requested_count} requested | ' + f'Review run' + ) + variant_links_html = " ".join(latest_variant_links) if latest_variant_links else 'No variant detail links yet.' + + return f""" +
    +

    Basis Item #{basis_item.id} | Slot {basis_item.slot} | Tryout {escape(basis_item.tryout_id)}

    +

    Source question: {source_label}

    +

    Stem: {escape(_truncate(_html_to_text(basis_item.stem), 180))}

    +

    Variants: {_variant_status_counts_html(variants)}

    +

    Latest run: {run_html}

    +
    + Basis workspace + Review variants + {variant_links_html} +
    +
    + """ + + +def _hierarchy_view_body(context: dict[str, list[Any]]) -> str: + websites: list[Website] = context["websites"] + snapshots: list[TryoutImportSnapshot] = context["snapshots"] + questions: list[TryoutSnapshotQuestion] = context["questions"] + basis_items: list[Item] = context["basis_items"] + variants: list[Item] = context["variants"] + runs: list[AIGenerationRun] = context["runs"] + + snapshots_by_website: dict[int, list[TryoutImportSnapshot]] = {} + questions_by_website: dict[int, list[TryoutSnapshotQuestion]] = {} + questions_by_snapshot: dict[int, list[TryoutSnapshotQuestion]] = {} + questions_by_id = {question.id: question for question in questions} + basis_by_website: dict[int, list[Item]] = {} + basis_by_source_question: dict[int, list[Item]] = {} + variants_by_website: dict[int, list[Item]] = {} + variants_by_basis: dict[int, list[Item]] = {} + runs_by_basis: dict[int, list[AIGenerationRun]] = {} + basis_by_id = {item.id: item for item in basis_items} + + for snapshot in snapshots: + _append_grouped(snapshots_by_website, snapshot.website_id, snapshot) + for question in questions: + _append_grouped(questions_by_website, question.website_id, question) + if question.latest_snapshot_id is not None: + _append_grouped(questions_by_snapshot, question.latest_snapshot_id, question) + for item in basis_items: + _append_grouped(basis_by_website, item.website_id, item) + if item.source_snapshot_question_id is not None: + _append_grouped(basis_by_source_question, item.source_snapshot_question_id, item) + for variant in variants: + _append_grouped(variants_by_website, variant.website_id, variant) + if variant.basis_item_id is not None: + _append_grouped(variants_by_basis, variant.basis_item_id, variant) + for run in runs: + _append_grouped(runs_by_basis, run.basis_item_id, run) + + snapshots_without_basis = [] + for snapshot in snapshots: + snapshot_question_ids = {question.id for question in questions_by_snapshot.get(snapshot.id, [])} + linked_basis = [ + item + for question_id in snapshot_question_ids + for item in basis_by_source_question.get(question_id, []) + ] + if not linked_basis: + snapshots_without_basis.append(snapshot) + + basis_without_variants = [item for item in basis_items if not variants_by_basis.get(item.id)] + variants_without_basis = [ + item + for item in variants + if item.basis_item_id is None or item.basis_item_id not in basis_by_id + ] + basis_missing_source = [item for item in basis_items if item.source_snapshot_question_id is None] + + website_sections = [] + for website in websites: + website_snapshots = snapshots_by_website.get(website.id, []) + website_questions = questions_by_website.get(website.id, []) + website_basis = basis_by_website.get(website.id, []) + website_variants = variants_by_website.get(website.id, []) + website_runs = [ + run + for item in website_basis + for run in runs_by_basis.get(item.id, []) + ] + snapshot_groups = [] + for snapshot in website_snapshots: + snapshot_questions = questions_by_snapshot.get(snapshot.id, []) + snapshot_question_ids = {question.id for question in snapshot_questions} + snapshot_basis = sorted( + [ + item + for question_id in snapshot_question_ids + for item in basis_by_source_question.get(question_id, []) + ], + key=lambda item: (item.slot, item.id), + ) + basis_html = ( + "".join( + _basis_hierarchy_item_html( + item, + questions_by_id.get(item.source_snapshot_question_id), + variants_by_basis.get(item.id, []), + runs_by_basis.get(item.id, []), + ) + for item in snapshot_basis + ) + if snapshot_basis + else '

    No promoted basis items for this snapshot yet.

    ' + ) + snapshot_groups.append( + f""" +
    +

    Snapshot {escape(snapshot.title)}

    +

    Tryout: {escape(snapshot.source_tryout_id)} | Snapshot #{snapshot.id} | Imported: {escape(str(snapshot.created_at))}

    +

    Questions in export: {snapshot.question_count} | Current source rows: {len(snapshot_questions)} | Promoted basis items: {len(snapshot_basis)}

    + + {basis_html} +
    + """ + ) + + website_sections.append( + f""" +
    +

    Website {escape(website.site_name)} | #{website.id}

    +

    {escape(website.site_url)}

    +
    +
    Snapshots{len(website_snapshots)}
    +
    Source Questions{len(website_questions)}
    +
    Basis Items{len(website_basis)}
    +
    AI Runs{len(website_runs)}
    +
    Variants{len(website_variants)}
    +
    + {"".join(snapshot_groups) if snapshot_groups else '

    No tryout imports have been recorded for this website yet.

    '} +
    + """ + ) + + if not website_sections: + website_sections.append( + '

    No websites have been registered yet.

    ' + ) + + return f""" +

    This read-only view shows how source tryout data becomes reviewable AI-generated question variants.

    + {_hierarchy_flow_strip()} + {_hierarchy_attention_html(snapshots_without_basis, basis_without_variants, variants_without_basis, basis_missing_source)} + {"".join(website_sections)} + """ + + async def _usage_metrics_for_items( db: AsyncSession, item_ids: list[int], @@ -1336,6 +1676,17 @@ async def dashboard_view(request: Request, db: AsyncSession = Depends(get_db)): return _render_admin_page(request, "IRT Bank Soal Admin", "Dashboard", body) +@router.get("/hierarchy", include_in_schema=False) +async def hierarchy_view(request: Request, db: AsyncSession = Depends(get_db)): + admin = await _current_admin(request) + if not admin: + return _login_redirect() + + context = await _load_hierarchy_context(db) + body = _hierarchy_view_body(context) + return _render_admin_page(request, "Data Hierarchy", "Data Hierarchy", body) + + @router.get("/websites", include_in_schema=False) async def websites_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request)