""" CAT (Computerized Adaptive Testing) Selection Service. Implements adaptive item selection algorithms for IRT-based testing. Supports three modes: CTT (fixed), IRT (adaptive), and hybrid. """ import math from dataclasses import dataclass from datetime import datetime from typing import Literal, Optional from sqlalchemy import and_, not_, or_, select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.models import Item, Session, Tryout, UserAnswer from app.services.irt_calibration import ( calculate_item_information, estimate_b_from_ctt_p, estimate_theta_mle, update_theta_after_response, ) class CATSelectionError(Exception): """Exception raised for CAT selection errors.""" pass @dataclass class NextItemResult: """Result of next item selection.""" item: Optional[Item] selection_method: str # 'fixed', 'adaptive', 'hybrid' slot: Optional[int] level: Optional[str] reason: str # Why this item was selected @dataclass class TerminationCheck: """Result of termination condition check.""" should_terminate: bool reason: str items_answered: int current_se: Optional[float] max_items: Optional[int] se_threshold_met: bool # Default SE threshold for termination DEFAULT_SE_THRESHOLD = 0.5 # Default max items if not configured DEFAULT_MAX_ITEMS = 50 async def get_next_item_fixed( db: AsyncSession, session_id: str, tryout_id: str, website_id: int, level_filter: Optional[str] = None ) -> NextItemResult: """ Get next item in fixed order (CTT mode). Returns items in slot order (1, 2, 3, ...). Filters by level if specified. Checks if student already answered this item. Args: db: Database session session_id: Session identifier tryout_id: Tryout identifier website_id: Website identifier level_filter: Optional difficulty level filter ('mudah', 'sedang', 'sulit') Returns: NextItemResult with selected item or None if no more items """ # Get session to find current position and answered items session_query = select(Session).where(Session.session_id == session_id) session_result = await db.execute(session_query) session = session_result.scalar_one_or_none() if not session: raise CATSelectionError(f"Session {session_id} not found") # Get all item IDs already answered by this user in this session answered_query = select(UserAnswer.item_id).where( UserAnswer.session_id == session_id ) answered_result = await db.execute(answered_query) answered_item_ids = [row[0] for row in answered_result.all()] # Build query for available items query = ( select(Item) .where( Item.tryout_id == tryout_id, Item.website_id == website_id ) .order_by(Item.slot, Item.level) ) # Apply level filter if specified if level_filter: query = query.where(Item.level == level_filter) # Exclude already answered items if answered_item_ids: query = query.where(not_(Item.id.in_(answered_item_ids))) result = await db.execute(query) items = result.scalars().all() if not items: return NextItemResult( item=None, selection_method="fixed", slot=None, level=None, reason="No more items available" ) # Return first available item (lowest slot) next_item = items[0] return NextItemResult( item=next_item, selection_method="fixed", slot=next_item.slot, level=next_item.level, reason=f"Fixed order selection - slot {next_item.slot}" ) async def get_next_item_adaptive( db: AsyncSession, session_id: str, tryout_id: str, website_id: int, ai_generation_enabled: bool = False, level_filter: Optional[str] = None ) -> NextItemResult: """ Get next item using adaptive selection (IRT mode). Finds item where b ≈ current theta. Only uses calibrated items (calibrated=True). Filters: student hasn't answered this item. Filters: AI-generated items only if AI generation is enabled. Args: db: Database session session_id: Session identifier tryout_id: Tryout identifier website_id: Website identifier ai_generation_enabled: Whether to include AI-generated items level_filter: Optional difficulty level filter Returns: NextItemResult with selected item or None if no suitable items """ # Get session for current theta session_query = select(Session).where(Session.session_id == session_id) session_result = await db.execute(session_query) session = session_result.scalar_one_or_none() if not session: raise CATSelectionError(f"Session {session_id} not found") # Get current theta (default to 0.0 for first item) current_theta = session.theta if session.theta is not None else 0.0 # Get all item IDs already answered by this user in this session answered_query = select(UserAnswer.item_id).where( UserAnswer.session_id == session_id ) answered_result = await db.execute(answered_query) answered_item_ids = [row[0] for row in answered_result.all()] # Build query for available calibrated items query = ( select(Item) .where( Item.tryout_id == tryout_id, Item.website_id == website_id, Item.calibrated == True # Only calibrated items for IRT ) ) # Apply level filter if specified if level_filter: query = query.where(Item.level == level_filter) # Exclude already answered items if answered_item_ids: query = query.where(not_(Item.id.in_(answered_item_ids))) # Filter AI-generated items if AI generation is disabled if not ai_generation_enabled: query = query.where(Item.generated_by == 'manual') result = await db.execute(query) items = result.scalars().all() if not items: return NextItemResult( item=None, selection_method="adaptive", slot=None, level=None, reason="No calibrated items available" ) # Find item with b closest to current theta # Also consider item information (prefer items with higher information at current theta) best_item = None best_score = float('inf') for item in items: if item.irt_b is None: # Skip items without b parameter (shouldn't happen with calibrated=True) continue # Calculate distance from theta b_distance = abs(item.irt_b - current_theta) # Calculate item information at current theta information = calculate_item_information(current_theta, item.irt_b) # Score: minimize distance, maximize information # Use weighted combination: lower score is better # Add small penalty for lower information score = b_distance - (0.1 * information) if score < best_score: best_score = score best_item = item if not best_item: return NextItemResult( item=None, selection_method="adaptive", slot=None, level=None, reason="No items with valid IRT parameters available" ) return NextItemResult( item=best_item, selection_method="adaptive", slot=best_item.slot, level=best_item.level, reason=f"Adaptive selection - b={best_item.irt_b:.3f} ≈ θ={current_theta:.3f}" ) async def get_next_item_hybrid( db: AsyncSession, session_id: str, tryout_id: str, website_id: int, hybrid_transition_slot: int = 10, ai_generation_enabled: bool = False, level_filter: Optional[str] = None ) -> NextItemResult: """ Get next item using hybrid selection. Uses fixed order for first N items, then switches to adaptive. Falls back to CTT if no calibrated items available. Args: db: Database session session_id: Session identifier tryout_id: Tryout identifier website_id: Website identifier hybrid_transition_slot: Slot number to transition from fixed to adaptive ai_generation_enabled: Whether to include AI-generated items level_filter: Optional difficulty level filter Returns: NextItemResult with selected item or None if no items available """ # Get session to check current position session_query = select(Session).where(Session.session_id == session_id) session_result = await db.execute(session_query) session = session_result.scalar_one_or_none() if not session: raise CATSelectionError(f"Session {session_id} not found") # Count answered items to determine current position count_query = select(func.count(UserAnswer.id)).where( UserAnswer.session_id == session_id ) count_result = await db.execute(count_query) items_answered = count_result.scalar() or 0 # Determine current slot (next slot to fill) current_slot = items_answered + 1 # Check if we're still in fixed phase if current_slot <= hybrid_transition_slot: # Use fixed selection for initial items result = await get_next_item_fixed( db, session_id, tryout_id, website_id, level_filter ) result.selection_method = "hybrid_fixed" result.reason = f"Hybrid mode (fixed phase) - slot {current_slot}" return result # Try adaptive selection adaptive_result = await get_next_item_adaptive( db, session_id, tryout_id, website_id, ai_generation_enabled, level_filter ) if adaptive_result.item is not None: adaptive_result.selection_method = "hybrid_adaptive" adaptive_result.reason = f"Hybrid mode (adaptive phase) - {adaptive_result.reason}" return adaptive_result # Fallback to fixed selection if no calibrated items available fixed_result = await get_next_item_fixed( db, session_id, tryout_id, website_id, level_filter ) fixed_result.selection_method = "hybrid_fallback" fixed_result.reason = f"Hybrid mode (CTT fallback) - {fixed_result.reason}" return fixed_result async def update_theta( db: AsyncSession, session_id: str, item_id: int, is_correct: bool ) -> tuple[float, float]: """ Update session theta estimate based on response. Calls estimate_theta from irt_calibration.py. Updates session.theta and session.theta_se. Handles initial theta (uses 0.0 for first item). Clamps theta to [-3, +3]. Args: db: Database session session_id: Session identifier item_id: Item that was answered is_correct: Whether the answer was correct Returns: Tuple of (theta, theta_se) """ return await update_theta_after_response(db, session_id, item_id, is_correct) async def should_terminate( db: AsyncSession, session_id: str, max_items: Optional[int] = None, se_threshold: float = DEFAULT_SE_THRESHOLD ) -> TerminationCheck: """ Check if session should terminate. Termination conditions: - Reached max_items - Reached SE threshold (theta_se < se_threshold) - No more items available Args: db: Database session session_id: Session identifier max_items: Maximum items allowed (None = no limit) se_threshold: SE threshold for termination Returns: TerminationCheck with termination status and reason """ # Get session session_query = select(Session).where(Session.session_id == session_id) session_result = await db.execute(session_query) session = session_result.scalar_one_or_none() if not session: raise CATSelectionError(f"Session {session_id} not found") # Count answered items count_query = select(func.count(UserAnswer.id)).where( UserAnswer.session_id == session_id ) count_result = await db.execute(count_query) items_answered = count_result.scalar() or 0 # Check max items max_items_reached = False if max_items is not None and items_answered >= max_items: max_items_reached = True # Check SE threshold current_se = session.theta_se se_threshold_met = False if current_se is not None and current_se < se_threshold: se_threshold_met = True # Check if we have enough items for SE threshold (at least 15 items per PRD) min_items_for_se = 15 se_threshold_met = se_threshold_met and items_answered >= min_items_for_se # Determine termination should_term = max_items_reached or se_threshold_met # Build reason reasons = [] if max_items_reached: reasons.append(f"max items reached ({items_answered}/{max_items})") if se_threshold_met: reasons.append(f"SE threshold met ({current_se:.3f} < {se_threshold})") if not reasons: reasons.append("continuing") return TerminationCheck( should_terminate=should_term, reason="; ".join(reasons), items_answered=items_answered, current_se=current_se, max_items=max_items, se_threshold_met=se_threshold_met ) async def get_next_item( db: AsyncSession, session_id: str, selection_mode: Literal["fixed", "adaptive", "hybrid"] = "fixed", hybrid_transition_slot: int = 10, ai_generation_enabled: bool = False, level_filter: Optional[str] = None ) -> NextItemResult: """ Get next item based on selection mode. Main entry point for item selection. Args: db: Database session session_id: Session identifier selection_mode: Selection mode ('fixed', 'adaptive', 'hybrid') hybrid_transition_slot: Slot to transition in hybrid mode ai_generation_enabled: Whether AI generation is enabled level_filter: Optional difficulty level filter Returns: NextItemResult with selected item """ # Get session for tryout info session_query = select(Session).where(Session.session_id == session_id) session_result = await db.execute(session_query) session = session_result.scalar_one_or_none() if not session: raise CATSelectionError(f"Session {session_id} not found") tryout_id = session.tryout_id website_id = session.website_id if selection_mode == "fixed": return await get_next_item_fixed( db, session_id, tryout_id, website_id, level_filter ) elif selection_mode == "adaptive": return await get_next_item_adaptive( db, session_id, tryout_id, website_id, ai_generation_enabled, level_filter ) elif selection_mode == "hybrid": return await get_next_item_hybrid( db, session_id, tryout_id, website_id, hybrid_transition_slot, ai_generation_enabled, level_filter ) else: raise CATSelectionError(f"Unknown selection mode: {selection_mode}") async def check_user_level_reuse( db: AsyncSession, wp_user_id: str, website_id: int, tryout_id: str, slot: int, level: str ) -> bool: """ Check if user has already answered a question at this difficulty level. Per PRD FR-5.3: Check if student user_id already answered question at specific difficulty level. Args: db: Database session wp_user_id: WordPress user ID website_id: Website identifier tryout_id: Tryout identifier slot: Question slot level: Difficulty level Returns: True if user has answered at this level, False otherwise """ # Check if user has answered any item at this slot/level combination query = ( select(func.count(UserAnswer.id)) .join(Item, UserAnswer.item_id == Item.id) .where( UserAnswer.wp_user_id == wp_user_id, UserAnswer.website_id == website_id, UserAnswer.tryout_id == tryout_id, Item.slot == slot, Item.level == level ) ) result = await db.execute(query) count = result.scalar() or 0 return count > 0 async def get_available_levels_for_slot( db: AsyncSession, tryout_id: str, website_id: int, slot: int ) -> list[str]: """ Get available difficulty levels for a specific slot. Args: db: Database session tryout_id: Tryout identifier website_id: Website identifier slot: Question slot Returns: List of available levels """ query = ( select(Item.level) .where( Item.tryout_id == tryout_id, Item.website_id == website_id, Item.slot == slot ) .distinct() ) result = await db.execute(query) levels = [row[0] for row in result.all()] return levels # Admin playground functions for testing CAT behavior async def simulate_cat_selection( db: AsyncSession, tryout_id: str, website_id: int, initial_theta: float = 0.0, selection_mode: Literal["fixed", "adaptive", "hybrid"] = "adaptive", max_items: int = 15, se_threshold: float = DEFAULT_SE_THRESHOLD, hybrid_transition_slot: int = 10 ) -> dict: """ Simulate CAT selection for admin testing. Returns sequence of selected items with b values and theta progression. Args: db: Database session tryout_id: Tryout identifier website_id: Website identifier initial_theta: Starting theta value selection_mode: Selection mode to use max_items: Maximum items to simulate se_threshold: SE threshold for termination hybrid_transition_slot: Slot to transition in hybrid mode Returns: Dict with simulation results """ # Get all items for this tryout items_query = ( select(Item) .where( Item.tryout_id == tryout_id, Item.website_id == website_id ) .order_by(Item.slot) ) items_result = await db.execute(items_query) all_items = list(items_result.scalars().all()) if not all_items: return { "error": "No items found for this tryout", "tryout_id": tryout_id, "website_id": website_id } # Simulate selection selected_items = [] current_theta = initial_theta current_se = 3.0 # Start with high uncertainty used_item_ids = set() for i in range(max_items): # Get available items available_items = [item for item in all_items if item.id not in used_item_ids] if not available_items: break # Select based on mode if selection_mode == "adaptive": # Filter to calibrated items only calibrated_items = [item for item in available_items if item.calibrated and item.irt_b is not None] if not calibrated_items: # Fallback to any available item calibrated_items = available_items # Find item closest to current theta best_item = min( calibrated_items, key=lambda item: abs((item.irt_b or 0) - current_theta) ) elif selection_mode == "fixed": # Select in slot order best_item = min(available_items, key=lambda item: item.slot) else: # hybrid if i < hybrid_transition_slot: best_item = min(available_items, key=lambda item: item.slot) else: calibrated_items = [item for item in available_items if item.calibrated and item.irt_b is not None] if calibrated_items: best_item = min( calibrated_items, key=lambda item: abs((item.irt_b or 0) - current_theta) ) else: best_item = min(available_items, key=lambda item: item.slot) used_item_ids.add(best_item.id) # Simulate response (random based on probability) import random b = best_item.irt_b or estimate_b_from_ctt_p(best_item.ctt_p) if best_item.ctt_p else 0.0 p_correct = 1.0 / (1.0 + math.exp(-(current_theta - b))) is_correct = random.random() < p_correct # Update theta (simplified) responses = [1 if item.get('is_correct', True) else 0 for item in selected_items] responses.append(1 if is_correct else 0) b_params = [item['b'] for item in selected_items] b_params.append(b) new_theta, new_se = estimate_theta_mle(responses, b_params, current_theta) current_theta = new_theta current_se = new_se selected_items.append({ "slot": best_item.slot, "level": best_item.level, "b": b, "is_correct": is_correct, "theta_after": current_theta, "se_after": current_se, "calibrated": best_item.calibrated }) # Check SE threshold if current_se < se_threshold and i >= 14: # At least 15 items break return { "tryout_id": tryout_id, "website_id": website_id, "initial_theta": initial_theta, "selection_mode": selection_mode, "total_items": len(selected_items), "final_theta": current_theta, "final_se": current_se, "se_threshold_met": current_se < se_threshold, "items": selected_items }