diff --git a/app/admin_web.py b/app/admin_web.py index 1c41510..cc06008 100644 --- a/app/admin_web.py +++ b/app/admin_web.py @@ -22,7 +22,7 @@ from starlette.status import HTTP_303_SEE_OTHER, HTTP_401_UNAUTHORIZED from app.core.config import get_settings from app.database import get_db -from app.models import Item, Session, Tryout, TryoutImportSnapshot, Website +from app.models import Item, Session, Tryout, TryoutImportSnapshot, TryoutSnapshotQuestion, Website from app.services.ai_generation import ( SUPPORTED_MODELS, generate_question, @@ -320,20 +320,24 @@ def _tryout_import_form_body( ) website_map = {website.id: website.site_name for website in websites} - snapshot_rows = [ - [ - snapshot.id, - f'{website_map.get(snapshot.website_id, "Unknown")} (#{snapshot.website_id})', - snapshot.source_tryout_id, - snapshot.title, - snapshot.question_count, - snapshot.created_at, - ] - for snapshot in recent_snapshots - ] - snapshots_table = _table( - ["Snapshot ID", "Website", "Tryout ID", "Title", "Questions", "Imported At"], - snapshot_rows, + snapshot_rows = [] + for snapshot in recent_snapshots: + snapshot_rows.append( + "" + f"{snapshot.id}" + f"{escape(website_map.get(snapshot.website_id, 'Unknown'))} (#{snapshot.website_id})" + f"{escape(snapshot.source_tryout_id)}" + f"{escape(snapshot.title)}" + f"{snapshot.question_count}" + f"{escape(str(snapshot.created_at))}" + f"Browse" + "" + ) + snapshots_table = ( + "" + + ("".join(snapshot_rows) if snapshot_rows else "") + + "
Snapshot IDWebsiteTryout IDTitleQuestionsImported AtActions
No data
" ) preview_html = "" @@ -401,6 +405,81 @@ def _tryout_import_form_body( """ +def _snapshot_slot_map(snapshot: TryoutImportSnapshot) -> dict[str, int]: + slot_map: dict[str, int] = {} + questions = (snapshot.raw_payload or {}).get("questions") or [] + for index, question in enumerate(questions, start=1): + source_question_id = str((question or {}).get("id") or "").strip() + if source_question_id: + slot_map[source_question_id] = index + return slot_map + + +def _snapshot_options_to_item_options(raw_options: list[dict[str, Any]] | list[Any]) -> dict[str, str]: + item_options: dict[str, str] = {} + for option in raw_options or []: + if not isinstance(option, dict): + continue + increment = str(option.get("increment") or "").strip().upper() + text = str(option.get("text") or option.get("label") or "").strip() + if increment and text: + item_options[increment] = text + return item_options + + +def _snapshot_questions_body( + snapshot: TryoutImportSnapshot, + questions: list[TryoutSnapshotQuestion], + promoted_items_by_slot: dict[int, Item], + error: str | None = None, + success: str | None = None, +) -> str: + error_html = f'
{escape(error)}
' if error else "" + success_html = f'
{escape(success)}
' if success else "" + slot_map = _snapshot_slot_map(snapshot) + rows = [] + for question in questions: + slot = slot_map.get(question.source_question_id, 0) + promoted_item = promoted_items_by_slot.get(slot) + if promoted_item: + action_html = ( + f'Item #{promoted_item.id} already exists. ' + f'Open in AI Playground' + ) + else: + action_html = ( + f'
' + f'' + f'' + '' + '
' + ) + rows.append( + "" + f"{slot or '-'}" + f"{escape(question.source_question_id)}" + f"{escape(question.correct_answer)}" + f"{question.option_count}" + f"{'Yes' if question.is_active else 'No'}" + f"{escape(_truncate(question.question_title or question.question_html, 100))}" + f"{action_html}" + "" + ) + questions_table = ( + "" + + ("".join(rows) if rows else "") + + "
SlotSource Question IDCorrectOptionsActiveStemAction
No data
" + ) + return f""" +

Snapshot ID: {snapshot.id} | Website: {snapshot.website_id} | Tryout: {escape(snapshot.source_tryout_id)}

+

Promote selected snapshot questions into the live items table as sedang basis items for AI generation.

+ {success_html} + {error_html} + {questions_table} +

Back to Tryout Import

+ """ + + async def _basis_items_for_playground(db: AsyncSession, limit: int = 20) -> list[Item]: result = await db.execute( select(Item) @@ -492,6 +571,32 @@ async def _recent_snapshots(db: AsyncSession, limit: int = 20) -> list[TryoutImp return list(result.scalars().all()) +async def _ensure_operational_tryout(snapshot: TryoutImportSnapshot, db: AsyncSession) -> Tryout: + result = await db.execute( + select(Tryout).where( + Tryout.website_id == snapshot.website_id, + Tryout.tryout_id == snapshot.source_tryout_id, + ) + ) + tryout = result.scalar_one_or_none() + if tryout: + return tryout + + tryout = Tryout( + website_id=snapshot.website_id, + tryout_id=snapshot.source_tryout_id, + name=snapshot.title, + description=f"Operational tryout basis created from imported snapshot #{snapshot.id}.", + scoring_mode="ctt", + selection_mode="fixed", + normalization_mode="static", + ai_generation_enabled=True, + ) + db.add(tryout) + await db.flush() + return tryout + + @router.get("", include_in_schema=False) @router.get("/", include_in_schema=False) async def admin_root(request: Request): @@ -990,6 +1095,184 @@ async def tryout_import_submit( return _render_admin_page("Tryout Import", "Tryout Import", body) +@router.get("/snapshot-questions", include_in_schema=False) +async def snapshot_questions_view( + request: Request, + snapshot_id: int, + db: AsyncSession = Depends(get_db), +): + admin = await _current_admin(request) + if not admin: + return _login_redirect() + + snapshot = await db.get(TryoutImportSnapshot, snapshot_id) + if snapshot is None: + websites = await _load_websites(db) + snapshots = await _recent_snapshots(db) + body = _tryout_import_form_body( + websites, + snapshots, + error=f"Snapshot not found: {snapshot_id}", + ) + return _render_admin_page("Tryout Import", "Tryout Import", body) + + result = await db.execute( + select(TryoutSnapshotQuestion) + .where( + TryoutSnapshotQuestion.website_id == snapshot.website_id, + TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, + ) + .order_by(TryoutSnapshotQuestion.source_question_id.asc()) + ) + questions = list(result.scalars().all()) + slot_map = _snapshot_slot_map(snapshot) + item_result = await db.execute( + select(Item).where( + Item.website_id == snapshot.website_id, + Item.tryout_id == snapshot.source_tryout_id, + Item.level == "sedang", + ) + ) + promoted_items = list(item_result.scalars().all()) + promoted_items_by_slot = {item.slot: item for item in promoted_items} + questions.sort(key=lambda question: (slot_map.get(question.source_question_id, 10**9), question.source_question_id)) + body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot) + return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) + + +@router.post("/snapshot-questions/promote", include_in_schema=False) +async def snapshot_question_promote( + request: Request, + snapshot_id: int = Form(...), + snapshot_question_id: int = Form(...), + db: AsyncSession = Depends(get_db), +): + admin = await _current_admin(request) + if not admin: + return _login_redirect() + + snapshot = await db.get(TryoutImportSnapshot, snapshot_id) + question = await db.get(TryoutSnapshotQuestion, snapshot_question_id) + if snapshot is None or question is None: + websites = await _load_websites(db) + snapshots = await _recent_snapshots(db) + body = _tryout_import_form_body( + websites, + snapshots, + error="Snapshot or snapshot question not found.", + ) + return _render_admin_page("Tryout Import", "Tryout Import", body) + + if ( + question.website_id != snapshot.website_id + or question.source_tryout_id != snapshot.source_tryout_id + ): + body = _snapshot_questions_body(snapshot, [], {}, error="Snapshot question does not belong to the selected snapshot.") + return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) + + slot_map = _snapshot_slot_map(snapshot) + slot = slot_map.get(question.source_question_id) + if not slot: + max_slot = ( + await db.scalar( + select(func.max(Item.slot)).where( + Item.website_id == snapshot.website_id, + Item.tryout_id == snapshot.source_tryout_id, + Item.level == "sedang", + ) + ) + or 0 + ) + slot = max_slot + 1 + + options = _snapshot_options_to_item_options(question.raw_options) + if not options: + item_result = await db.execute( + select(Item).where( + Item.website_id == snapshot.website_id, + Item.tryout_id == snapshot.source_tryout_id, + Item.level == "sedang", + ) + ) + promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()} + question_result = await db.execute( + select(TryoutSnapshotQuestion) + .where( + TryoutSnapshotQuestion.website_id == snapshot.website_id, + TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, + ) + .order_by(TryoutSnapshotQuestion.source_question_id.asc()) + ) + questions = list(question_result.scalars().all()) + questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id)) + body = _snapshot_questions_body( + snapshot, + questions, + promoted_items_by_slot, + error="Snapshot question has no usable option text, so it cannot be promoted into the live item bank.", + ) + return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) + + await _ensure_operational_tryout(snapshot, db) + + existing_item_result = await db.execute( + select(Item).where( + Item.website_id == snapshot.website_id, + Item.tryout_id == snapshot.source_tryout_id, + Item.slot == slot, + Item.level == "sedang", + ) + ) + existing_item = existing_item_result.scalar_one_or_none() + if existing_item is None: + existing_item = Item( + tryout_id=snapshot.source_tryout_id, + website_id=snapshot.website_id, + slot=slot, + level="sedang", + stem=question.question_html, + options=options, + correct_answer=question.correct_answer, + explanation=question.explanation_html, + generated_by="manual", + calibrated=False, + calibration_sample_size=0, + ) + db.add(existing_item) + await db.commit() + await db.refresh(existing_item) + success_message = ( + f"Snapshot question {question.source_question_id} promoted successfully as Item #{existing_item.id}. " + f"Use that Basis Item ID in AI Playground." + ) + else: + success_message = ( + f"Snapshot question {question.source_question_id} is already available as Item #{existing_item.id}. " + f"Use that Basis Item ID in AI Playground." + ) + + question_result = await db.execute( + select(TryoutSnapshotQuestion) + .where( + TryoutSnapshotQuestion.website_id == snapshot.website_id, + TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, + ) + .order_by(TryoutSnapshotQuestion.source_question_id.asc()) + ) + questions = list(question_result.scalars().all()) + item_result = await db.execute( + select(Item).where( + Item.website_id == snapshot.website_id, + Item.tryout_id == snapshot.source_tryout_id, + Item.level == "sedang", + ) + ) + promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()} + questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id)) + body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot, success=success_message) + return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) + + @router.get("/calibration-status", include_in_schema=False) async def calibration_status_view(request: Request, db: AsyncSession = Depends(get_db)): admin = await _current_admin(request) @@ -1201,7 +1484,13 @@ async def ai_playground_view(request: Request, db: AsyncSession = Depends(get_db stats = await get_ai_stats(db) basis_items = await _basis_items_for_playground(db) - body = _ai_form_body(bool(settings.OPENROUTER_API_KEY), stats, basis_items=basis_items) + basis_item_id = request.query_params.get("basis_item_id", "") + body = _ai_form_body( + bool(settings.OPENROUTER_API_KEY), + stats, + basis_items=basis_items, + basis_item_id=str(basis_item_id or ""), + ) return _render_admin_page("AI Playground", "AI Playground", body)