diff --git a/app/admin_web.py b/app/admin_web.py index cc06008..4efc069 100644 --- a/app/admin_web.py +++ b/app/admin_web.py @@ -442,20 +442,17 @@ def _snapshot_questions_body( slot = slot_map.get(question.source_question_id, 0) promoted_item = promoted_items_by_slot.get(slot) if promoted_item: + select_html = "" action_html = ( f'Item #{promoted_item.id} already exists. ' f'Open in AI Playground' ) else: - action_html = ( - f'
' - f'' - f'' - '' - '
' - ) + select_html = f'' + action_html = "Ready to promote" rows.append( "" + f"{select_html}" f"{slot or '-'}" f"{escape(question.source_question_id)}" f"{escape(question.correct_answer)}" @@ -466,9 +463,15 @@ def _snapshot_questions_body( "" ) questions_table = ( - "" - + ("".join(rows) if rows else "") + "" + f"" + "
" + "" + "
" + "
SlotSource Question IDCorrectOptionsActiveStemAction
No data
" + + ("".join(rows) if rows else "") + "
el.checked = this.checked)\">SlotSource Question IDCorrectOptionsActiveStemAction
No data
" + "" ) return f"""

Snapshot ID: {snapshot.id} | Website: {snapshot.website_id} | Tryout: {escape(snapshot.source_tryout_id)}

@@ -597,6 +600,94 @@ async def _ensure_operational_tryout(snapshot: TryoutImportSnapshot, db: AsyncSe return tryout +async def _load_snapshot_question_context( + snapshot: TryoutImportSnapshot, + db: AsyncSession, +) -> tuple[list[TryoutSnapshotQuestion], dict[int, Item], dict[str, int]]: + question_result = await db.execute( + select(TryoutSnapshotQuestion) + .where( + TryoutSnapshotQuestion.website_id == snapshot.website_id, + TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, + ) + .order_by(TryoutSnapshotQuestion.source_question_id.asc()) + ) + questions = list(question_result.scalars().all()) + item_result = await db.execute( + select(Item).where( + Item.website_id == snapshot.website_id, + Item.tryout_id == snapshot.source_tryout_id, + Item.level == "sedang", + ) + ) + promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()} + slot_map = _snapshot_slot_map(snapshot) + questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id)) + return questions, promoted_items_by_slot, slot_map + + +async def _promote_snapshot_question_to_item( + snapshot: TryoutImportSnapshot, + question: TryoutSnapshotQuestion, + db: AsyncSession, +) -> tuple[Item | None, str]: + if ( + question.website_id != snapshot.website_id + or question.source_tryout_id != snapshot.source_tryout_id + ): + return None, "mismatch" + + slot_map = _snapshot_slot_map(snapshot) + slot = slot_map.get(question.source_question_id) + if not slot: + max_slot = ( + await db.scalar( + select(func.max(Item.slot)).where( + Item.website_id == snapshot.website_id, + Item.tryout_id == snapshot.source_tryout_id, + Item.level == "sedang", + ) + ) + or 0 + ) + slot = max_slot + 1 + + options = _snapshot_options_to_item_options(question.raw_options) + if not options: + return None, "missing_options" + + await _ensure_operational_tryout(snapshot, db) + + existing_item_result = await db.execute( + select(Item).where( + Item.website_id == snapshot.website_id, + Item.tryout_id == snapshot.source_tryout_id, + Item.slot == slot, + Item.level == "sedang", + ) + ) + existing_item = existing_item_result.scalar_one_or_none() + if existing_item is not None: + return existing_item, "existing" + + item = Item( + tryout_id=snapshot.source_tryout_id, + website_id=snapshot.website_id, + slot=slot, + level="sedang", + stem=question.question_html, + options=options, + correct_answer=question.correct_answer, + explanation=question.explanation_html, + generated_by="manual", + calibrated=False, + calibration_sample_size=0, + ) + db.add(item) + await db.flush() + return item, "created" + + @router.get("", include_in_schema=False) @router.get("/", include_in_schema=False) async def admin_root(request: Request): @@ -1116,35 +1207,16 @@ async def snapshot_questions_view( ) return _render_admin_page("Tryout Import", "Tryout Import", body) - result = await db.execute( - select(TryoutSnapshotQuestion) - .where( - TryoutSnapshotQuestion.website_id == snapshot.website_id, - TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, - ) - .order_by(TryoutSnapshotQuestion.source_question_id.asc()) - ) - questions = list(result.scalars().all()) - slot_map = _snapshot_slot_map(snapshot) - item_result = await db.execute( - select(Item).where( - Item.website_id == snapshot.website_id, - Item.tryout_id == snapshot.source_tryout_id, - Item.level == "sedang", - ) - ) - promoted_items = list(item_result.scalars().all()) - promoted_items_by_slot = {item.slot: item for item in promoted_items} - questions.sort(key=lambda question: (slot_map.get(question.source_question_id, 10**9), question.source_question_id)) + questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db) body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot) return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) -@router.post("/snapshot-questions/promote", include_in_schema=False) -async def snapshot_question_promote( +@router.post("/snapshot-questions/promote-bulk", include_in_schema=False) +async def snapshot_question_promote_bulk( request: Request, snapshot_id: int = Form(...), - snapshot_question_id: int = Form(...), + snapshot_question_ids: list[int] | None = Form(None), db: AsyncSession = Depends(get_db), ): admin = await _current_admin(request) @@ -1152,123 +1224,65 @@ async def snapshot_question_promote( return _login_redirect() snapshot = await db.get(TryoutImportSnapshot, snapshot_id) - question = await db.get(TryoutSnapshotQuestion, snapshot_question_id) - if snapshot is None or question is None: + if snapshot is None: websites = await _load_websites(db) snapshots = await _recent_snapshots(db) body = _tryout_import_form_body( websites, snapshots, - error="Snapshot or snapshot question not found.", + error=f"Snapshot not found: {snapshot_id}", ) return _render_admin_page("Tryout Import", "Tryout Import", body) - if ( - question.website_id != snapshot.website_id - or question.source_tryout_id != snapshot.source_tryout_id - ): - body = _snapshot_questions_body(snapshot, [], {}, error="Snapshot question does not belong to the selected snapshot.") - return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) - - slot_map = _snapshot_slot_map(snapshot) - slot = slot_map.get(question.source_question_id) - if not slot: - max_slot = ( - await db.scalar( - select(func.max(Item.slot)).where( - Item.website_id == snapshot.website_id, - Item.tryout_id == snapshot.source_tryout_id, - Item.level == "sedang", - ) - ) - or 0 - ) - slot = max_slot + 1 - - options = _snapshot_options_to_item_options(question.raw_options) - if not options: - item_result = await db.execute( - select(Item).where( - Item.website_id == snapshot.website_id, - Item.tryout_id == snapshot.source_tryout_id, - Item.level == "sedang", - ) - ) - promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()} - question_result = await db.execute( - select(TryoutSnapshotQuestion) - .where( - TryoutSnapshotQuestion.website_id == snapshot.website_id, - TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, - ) - .order_by(TryoutSnapshotQuestion.source_question_id.asc()) - ) - questions = list(question_result.scalars().all()) - questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id)) + if not snapshot_question_ids: + questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db) body = _snapshot_questions_body( snapshot, questions, promoted_items_by_slot, - error="Snapshot question has no usable option text, so it cannot be promoted into the live item bank.", + error="Select at least one snapshot question to promote.", ) return _render_admin_page("Snapshot Questions", "Snapshot Questions", body) - await _ensure_operational_tryout(snapshot, db) - - existing_item_result = await db.execute( - select(Item).where( - Item.website_id == snapshot.website_id, - Item.tryout_id == snapshot.source_tryout_id, - Item.slot == slot, - Item.level == "sedang", - ) - ) - existing_item = existing_item_result.scalar_one_or_none() - if existing_item is None: - existing_item = Item( - tryout_id=snapshot.source_tryout_id, - website_id=snapshot.website_id, - slot=slot, - level="sedang", - stem=question.question_html, - options=options, - correct_answer=question.correct_answer, - explanation=question.explanation_html, - generated_by="manual", - calibrated=False, - calibration_sample_size=0, - ) - db.add(existing_item) - await db.commit() - await db.refresh(existing_item) - success_message = ( - f"Snapshot question {question.source_question_id} promoted successfully as Item #{existing_item.id}. " - f"Use that Basis Item ID in AI Playground." - ) - else: - success_message = ( - f"Snapshot question {question.source_question_id} is already available as Item #{existing_item.id}. " - f"Use that Basis Item ID in AI Playground." - ) - question_result = await db.execute( - select(TryoutSnapshotQuestion) - .where( - TryoutSnapshotQuestion.website_id == snapshot.website_id, - TryoutSnapshotQuestion.source_tryout_id == snapshot.source_tryout_id, - ) - .order_by(TryoutSnapshotQuestion.source_question_id.asc()) - ) - questions = list(question_result.scalars().all()) - item_result = await db.execute( - select(Item).where( - Item.website_id == snapshot.website_id, - Item.tryout_id == snapshot.source_tryout_id, - Item.level == "sedang", + select(TryoutSnapshotQuestion).where( + TryoutSnapshotQuestion.id.in_(snapshot_question_ids) ) ) - promoted_items_by_slot = {item.slot: item for item in item_result.scalars().all()} - questions.sort(key=lambda row: (slot_map.get(row.source_question_id, 10**9), row.source_question_id)) + selected_questions = list(question_result.scalars().all()) + + created_items: list[Item] = [] + existing_items: list[Item] = [] + missing_option_count = 0 + mismatch_count = 0 + + for question in selected_questions: + item, status = await _promote_snapshot_question_to_item(snapshot, question, db) + if status == "created" and item is not None: + created_items.append(item) + elif status == "existing" and item is not None: + existing_items.append(item) + elif status == "missing_options": + missing_option_count += 1 + elif status == "mismatch": + mismatch_count += 1 + + await db.commit() + + questions, promoted_items_by_slot, _ = await _load_snapshot_question_context(snapshot, db) + success_parts = [] + if created_items: + success_parts.append(f"created {len(created_items)} item(s)") + if existing_items: + success_parts.append(f"reused {len(existing_items)} existing item(s)") + if missing_option_count: + success_parts.append(f"skipped {missing_option_count} question(s) with missing option text") + if mismatch_count: + success_parts.append(f"skipped {mismatch_count} mismatched question(s)") + success_message = "Bulk promote finished: " + ", ".join(success_parts) + "." + if created_items: + success_message += f" Latest basis item ID: {created_items[-1].id}." + body = _snapshot_questions_body(snapshot, questions, promoted_items_by_slot, success=success_message) return _render_admin_page("Snapshot Questions", "Snapshot Questions", body)