""" CTT (Classical Test Theory) Scoring Engine. Implements exact Excel formulas for: - p-value (Tingkat Kesukaran): p = Σ Benar / Total Peserta - Bobot (Weight): Bobot = 1 - p - NM (Nilai Mentah): NM = (Total_Bobot_Siswa / Total_Bobot_Max) × 1000 - NN (Nilai Nasional): NN = 500 + 100 × ((NM - Rataan) / SB) All formulas match PRD Section 13.1 exactly. """ import math from datetime import datetime, timezone from typing import Optional from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.item import Item from app.models.tryout_stats import TryoutStats from app.models.user_answer import UserAnswer def calculate_ctt_p(total_correct: int, total_participants: int) -> float: """ Calculate CTT p-value (Tingkat Kesukaran / Difficulty). Formula: p = Σ Benar / Total Peserta Args: total_correct: Number of correct answers (Σ Benar) total_participants: Total number of participants (Total Peserta) Returns: p-value in range [0.0, 1.0] Raises: ValueError: If total_participants is 0 or values are invalid """ if total_participants <= 0: raise ValueError("total_participants must be greater than 0") if total_correct < 0: raise ValueError("total_correct cannot be negative") if total_correct > total_participants: raise ValueError("total_correct cannot exceed total_participants") p = total_correct / total_participants # Clamp to valid range [0, 1] return max(0.0, min(1.0, p)) def calculate_ctt_bobot(p_value: float) -> float: """ Calculate CTT bobot (weight) from p-value. Formula: Bobot = 1 - p Interpretation: - Easy questions (p > 0.70) have low bobot (< 0.30) - Difficult questions (p < 0.30) have high bobot (> 0.70) - Medium questions (0.30 ≤ p ≤ 0.70) have moderate bobot Args: p_value: CTT p-value in range [0.0, 1.0] Returns: bobot (weight) in range [0.0, 1.0] Raises: ValueError: If p_value is outside [0, 1] range """ if not 0.0 <= p_value <= 1.0: raise ValueError(f"p_value must be in range [0, 1], got {p_value}") bobot = 1.0 - p_value # Clamp to valid range [0, 1] return max(0.0, min(1.0, bobot)) def calculate_ctt_nm(total_bobot_siswa: float, total_bobot_max: float) -> int: """ Calculate CTT NM (Nilai Mentah / Raw Score). Formula: NM = (Total_Bobot_Siswa / Total_Bobot_Max) × 1000 This is equivalent to Excel's SUMPRODUCT calculation where: - Total_Bobot_Siswa = Σ(bobot_earned for each correct answer) - Total_Bobot_Max = Σ(bobot for all questions) Args: total_bobot_siswa: Total weight earned by student total_bobot_max: Maximum possible weight (sum of all item bobots) Returns: NM (raw score) in range [0, 1000] Raises: ValueError: If total_bobot_max is 0 or values are invalid """ if total_bobot_max <= 0: raise ValueError("total_bobot_max must be greater than 0") if total_bobot_siswa < 0: raise ValueError("total_bobot_siswa cannot be negative") nm = (total_bobot_siswa / total_bobot_max) * 1000 # Round to integer and clamp to valid range [0, 1000] nm_int = round(nm) return max(0, min(1000, nm_int)) def calculate_ctt_nn(nm: int, rataan: float, sb: float) -> int: """ Calculate CTT NN (Nilai Nasional / Normalized Score). Formula: NN = 500 + 100 × ((NM - Rataan) / SB) Normalizes scores to mean=500, SD=100 distribution. Args: nm: Nilai Mentah (raw score) in range [0, 1000] rataan: Mean of NM scores sb: Standard deviation of NM scores (Simpangan Baku) Returns: NN (normalized score) in range [0, 1000] Raises: ValueError: If nm is out of range or sb is invalid """ if not 0 <= nm <= 1000: raise ValueError(f"nm must be in range [0, 1000], got {nm}") if sb <= 0: # If SD is 0 or negative, return default normalized score # This handles edge case where all scores are identical return 500 # Calculate normalized score z_score = (nm - rataan) / sb nn = 500 + 100 * z_score # Round to integer and clamp to valid range [0, 1000] nn_int = round(nn) return max(0, min(1000, nn_int)) def categorize_difficulty(p_value: float) -> str: """ Categorize question difficulty based on CTT p-value. Categories per CTT standards (PRD Section 13.2): - p < 0.30 → Sukar (Sulit) - 0.30 ≤ p ≤ 0.70 → Sedang - p > 0.70 → Mudah Args: p_value: CTT p-value in range [0.0, 1.0] Returns: Difficulty category: "mudah", "sedang", or "sulit" """ if p_value > 0.70: return "mudah" elif p_value >= 0.30: return "sedang" else: return "sulit" async def calculate_ctt_p_for_item( db: AsyncSession, item_id: int ) -> Optional[float]: """ Calculate CTT p-value for a specific item from existing responses. Queries all UserAnswer records for the item to calculate: p = Σ Benar / Total Peserta Args: db: Async database session item_id: Item ID to calculate p-value for Returns: p-value in range [0.0, 1.0], or None if no responses exist """ # Count total responses and correct responses result = await db.execute( select( func.count().label("total"), func.sum(func.cast(UserAnswer.is_correct, type_=func.INTEGER)).label("correct"), ).where(UserAnswer.item_id == item_id) ) row = result.first() if row is None or row.total == 0: return None return calculate_ctt_p(row.correct or 0, row.total) async def update_tryout_stats( db: AsyncSession, website_id: int, tryout_id: str, nm: int, ) -> TryoutStats: """ Incrementally update TryoutStats with new NM score. Updates: - participant_count += 1 - total_nm_sum += nm - total_nm_sq_sum += nm² - Recalculates rataan (mean) and sb (standard deviation) - Updates min_nm and max_nm if applicable Uses Welford's online algorithm for numerically stable variance calculation. Args: db: Async database session website_id: Website identifier tryout_id: Tryout identifier nm: New NM score to add Returns: Updated TryoutStats record """ # Get or create TryoutStats result = await db.execute( select(TryoutStats).where( TryoutStats.website_id == website_id, TryoutStats.tryout_id == tryout_id, ) ) stats = result.scalar_one_or_none() if stats is None: # Create new stats record stats = TryoutStats( website_id=website_id, tryout_id=tryout_id, participant_count=1, total_nm_sum=float(nm), total_nm_sq_sum=float(nm * nm), rataan=float(nm), sb=0.0, # SD is 0 for single data point min_nm=nm, max_nm=nm, last_calculated=datetime.now(timezone.utc), ) db.add(stats) else: # Incrementally update existing stats stats.participant_count += 1 stats.total_nm_sum += nm stats.total_nm_sq_sum += nm * nm # Update min/max if stats.min_nm is None or nm < stats.min_nm: stats.min_nm = nm if stats.max_nm is None or nm > stats.max_nm: stats.max_nm = nm # Recalculate mean and SD n = stats.participant_count sum_nm = stats.total_nm_sum sum_nm_sq = stats.total_nm_sq_sum # Mean = Σ NM / n stats.rataan = sum_nm / n # Variance = (Σ NM² / n) - (mean)² # Using population standard deviation if n > 1: variance = (sum_nm_sq / n) - (stats.rataan ** 2) # Clamp variance to non-negative (handles floating point errors) variance = max(0.0, variance) stats.sb = math.sqrt(variance) else: stats.sb = 0.0 stats.last_calculated = datetime.now(timezone.utc) await db.flush() return stats async def get_total_bobot_max( db: AsyncSession, website_id: int, tryout_id: str, level: str = "sedang", ) -> float: """ Calculate total maximum bobot for a tryout. Total_Bobot_Max = Σ bobot for all questions in the tryout Args: db: Async database session website_id: Website identifier tryout_id: Tryout identifier level: Difficulty level to filter by (default: "sedang") Returns: Sum of all item bobots Raises: ValueError: If no items found or items have no bobot values """ result = await db.execute( select(func.sum(Item.ctt_bobot)).where( Item.website_id == website_id, Item.tryout_id == tryout_id, Item.level == level, ) ) total_bobot = result.scalar() if total_bobot is None or total_bobot == 0: raise ValueError( f"No items with bobot found for tryout {tryout_id}, level {level}" ) return float(total_bobot) def convert_ctt_p_to_irt_b(p_value: float) -> float: """ Convert CTT p-value to IRT difficulty parameter (b). Formula: b ≈ -ln((1-p)/p) This provides an initial estimate for IRT calibration. Maps p ∈ (0, 1) to b ∈ (-∞, +∞), typically [-3, +3]. Args: p_value: CTT p-value in range (0.0, 1.0) Returns: IRT b-parameter estimate Raises: ValueError: If p_value is at boundaries (0 or 1) """ if p_value <= 0.0 or p_value >= 1.0: # Handle edge cases by clamping if p_value <= 0.0: return 3.0 # Very difficult else: return -3.0 # Very easy # b ≈ -ln((1-p)/p) odds_ratio = (1 - p_value) / p_value b = -math.log(odds_ratio) # Clamp to valid IRT range [-3, +3] return max(-3.0, min(3.0, b)) def map_theta_to_nn(theta: float) -> int: """ Map IRT theta (ability) to NN score for comparison. Formula: NN = 500 + (θ / 3) × 500 Maps θ ∈ [-3, +3] to NN ∈ [0, 1000]. Args: theta: IRT ability estimate in range [-3.0, +3.0] Returns: NN score in range [0, 1000] """ # Clamp theta to valid range theta_clamped = max(-3.0, min(3.0, theta)) # Map to NN nn = 500 + (theta_clamped / 3) * 500 # Round and clamp to valid range return max(0, min(1000, round(nn)))