""" Dynamic Normalization Service. Implements dynamic normalization with real-time calculation of rataan and SB for each tryout. Supports multiple normalization modes: - Static: Use hardcoded rataan/SB from config - Dynamic: Calculate rataan/SB from participant NM scores in real-time - Hybrid: Use static until threshold reached, then switch to dynamic """ import logging import math from datetime import datetime, timezone from typing import Literal, Optional, Tuple from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.tryout import Tryout from app.models.tryout_stats import TryoutStats logger = logging.getLogger(__name__) async def calculate_dynamic_stats( db: AsyncSession, website_id: int, tryout_id: str, ) -> Tuple[Optional[float], Optional[float]]: """ Calculate current dynamic stats (rataan and SB) from TryoutStats. Fetches current TryoutStats for this (tryout_id, website_id) pair and returns the calculated rataan and SB values. Args: db: Async database session website_id: Website identifier tryout_id: Tryout identifier Returns: Tuple of (rataan, sb), both None if no stats exist """ result = await db.execute( select(TryoutStats).where( TryoutStats.website_id == website_id, TryoutStats.tryout_id == tryout_id, ) ) stats = result.scalar_one_or_none() if stats is None: return None, None return stats.rataan, stats.sb async def update_dynamic_normalization( db: AsyncSession, website_id: int, tryout_id: str, nm: int, ) -> Tuple[float, float]: """ Update dynamic normalization with new NM score. Fetches current TryoutStats and incrementally updates it with the new NM: - Increments participant_count by 1 - Adds NM to total_nm_sum - Adds NM² to total_nm_sq_sum - Recalculates rataan and sb Args: db: Async database session website_id: Website identifier tryout_id: Tryout identifier nm: Nilai Mentah (raw score) to add Returns: Tuple of updated (rataan, sb) Raises: ValueError: If nm is out of valid range [0, 1000] """ if not 0 <= nm <= 1000: raise ValueError(f"nm must be in range [0, 1000], got {nm}") result = await db.execute( select(TryoutStats).where( TryoutStats.website_id == website_id, TryoutStats.tryout_id == tryout_id, ) ) stats = result.scalar_one_or_none() if stats is None: # Initialize new stats record stats = TryoutStats( website_id=website_id, tryout_id=tryout_id, participant_count=1, total_nm_sum=float(nm), total_nm_sq_sum=float(nm * nm), rataan=float(nm), sb=0.0, # SD is 0 for single data point min_nm=nm, max_nm=nm, last_calculated=datetime.now(timezone.utc), ) db.add(stats) else: # Incrementally update existing stats stats.participant_count += 1 stats.total_nm_sum += nm stats.total_nm_sq_sum += nm * nm # Update min/max if stats.min_nm is None or nm < stats.min_nm: stats.min_nm = nm if stats.max_nm is None or nm > stats.max_nm: stats.max_nm = nm # Recalculate mean and SD n = stats.participant_count sum_nm = stats.total_nm_sum sum_nm_sq = stats.total_nm_sq_sum # Mean = Σ NM / n mean = sum_nm / n stats.rataan = mean # Variance = (Σ NM² / n) - (mean)² # Using population standard deviation if n > 1: variance = (sum_nm_sq / n) - (mean ** 2) # Clamp variance to non-negative (handles floating point errors) variance = max(0.0, variance) stats.sb = math.sqrt(variance) else: stats.sb = 0.0 stats.last_calculated = datetime.now(timezone.utc) await db.flush() logger.info( f"Updated dynamic normalization for tryout {tryout_id}, " f"website {website_id}: participant_count={stats.participant_count}, " f"rataan={stats.rataan:.2f}, sb={stats.sb:.2f}" ) # rataan and sb are always set by this function assert stats.rataan is not None assert stats.sb is not None return stats.rataan, stats.sb def apply_normalization( nm: int, rataan: float, sb: float, ) -> int: """ Apply normalization to NM to get NN (Nilai Nasional). Formula: NN = 500 + 100 × ((NM - Rataan) / SB) Normalizes scores to mean=500, SD=100 distribution. Args: nm: Nilai Mentah (raw score) in range [0, 1000] rataan: Mean of NM scores sb: Standard deviation of NM scores Returns: NN (normalized score) in range [0, 1000] Raises: ValueError: If nm is out of range or sb is invalid """ if not 0 <= nm <= 1000: raise ValueError(f"nm must be in range [0, 1000], got {nm}") if sb <= 0: # If SD is 0 or negative, return default normalized score # This handles edge case where all scores are identical return 500 # Calculate normalized score z_score = (nm - rataan) / sb nn = 500 + 100 * z_score # Round to integer and clamp to valid range [0, 1000] nn_int = round(nn) return max(0, min(1000, nn_int)) async def get_normalization_mode( db: AsyncSession, website_id: int, tryout_id: str, ) -> Literal["static", "dynamic", "hybrid"]: """ Get the current normalization mode for a tryout. Args: db: Async database session website_id: Website identifier tryout_id: Tryout identifier Returns: Normalization mode: "static", "dynamic", or "hybrid" Raises: ValueError: If tryout not found """ result = await db.execute( select(Tryout).where( Tryout.website_id == website_id, Tryout.tryout_id == tryout_id, ) ) tryout = result.scalar_one_or_none() if tryout is None: raise ValueError( f"Tryout {tryout_id} not found for website {website_id}" ) return tryout.normalization_mode async def check_threshold_for_dynamic( db: AsyncSession, website_id: int, tryout_id: str, ) -> bool: """ Check if participant count meets threshold for dynamic normalization. Compares current participant_count with min_sample_for_dynamic from config. Args: db: Async database session website_id: Website identifier tryout_id: Tryout identifier Returns: True if participant_count >= min_sample_for_dynamic, else False """ # Fetch current TryoutStats stats_result = await db.execute( select(TryoutStats).where( TryoutStats.website_id == website_id, TryoutStats.tryout_id == tryout_id, ) ) stats = stats_result.scalar_one_or_none() current_participant_count = stats.participant_count if stats else 0 # Fetch min_sample_for_dynamic from config tryout_result = await db.execute( select(Tryout.min_sample_for_dynamic).where( Tryout.website_id == website_id, Tryout.tryout_id == tryout_id, ) ) min_sample = tryout_result.scalar_one_or_none() if min_sample is None: # Default to 100 if not configured min_sample = 100 return current_participant_count >= min_sample async def get_normalization_params( db: AsyncSession, website_id: int, tryout_id: str, ) -> Tuple[float, float, Literal["static", "dynamic"]]: """ Get normalization parameters (rataan, sb) based on current mode. Determines which normalization parameters to use: - Static mode: Use config.static_rataan and config.static_sb - Dynamic mode: Use calculated rataan and sb from TryoutStats - Hybrid mode: Use static until threshold reached, then dynamic Args: db: Async database session website_id: Website identifier tryout_id: Tryout identifier Returns: Tuple of (rataan, sb, mode_used) Raises: ValueError: If tryout not found or dynamic stats unavailable """ # Get normalization mode mode = await get_normalization_mode(db, website_id, tryout_id) if mode == "static": # Use static values from config result = await db.execute( select(Tryout.static_rataan, Tryout.static_sb).where( Tryout.website_id == website_id, Tryout.tryout_id == tryout_id, ) ) row = result.scalar_one_or_none() if row is None: raise ValueError( f"Tryout {tryout_id} not found for website {website_id}" ) rataan, sb = row return rataan, sb, "static" elif mode == "dynamic": # Use dynamic values from stats rataan, sb = await calculate_dynamic_stats(db, website_id, tryout_id) if rataan is None or sb is None: raise ValueError( f"Dynamic normalization not available for tryout {tryout_id}. " "No stats have been calculated yet." ) if sb == 0: logger.warning( f"Standard deviation is 0 for tryout {tryout_id}. " "All NM scores are identical." ) return rataan, sb, "dynamic" else: # hybrid # Check threshold threshold_met = await check_threshold_for_dynamic(db, website_id, tryout_id) if threshold_met: # Use dynamic values rataan, sb = await calculate_dynamic_stats(db, website_id, tryout_id) if rataan is None or sb is None: # Fallback to static if dynamic not available result = await db.execute( select(Tryout.static_rataan, Tryout.static_sb).where( Tryout.website_id == website_id, Tryout.tryout_id == tryout_id, ) ) row = result.scalar_one_or_none() if row is None: raise ValueError( f"Tryout {tryout_id} not found for website {website_id}" ) rataan, sb = row return rataan, sb, "static" return rataan, sb, "dynamic" else: # Use static values result = await db.execute( select(Tryout.static_rataan, Tryout.static_sb).where( Tryout.website_id == website_id, Tryout.tryout_id == tryout_id, ) ) row = result.scalar_one_or_none() if row is None: raise ValueError( f"Tryout {tryout_id} not found for website {website_id}" ) rataan, sb = row return rataan, sb, "static" async def calculate_skewness( db: AsyncSession, website_id: int, tryout_id: str, ) -> Optional[float]: """ Calculate skewness of NM distribution for validation. Skewness measures the asymmetry of the probability distribution. Values: - Skewness ≈ 0: Symmetric distribution - Skewness > 0: Right-skewed (tail to the right) - Skewness < 0: Left-skewed (tail to the left) Formula: Skewness = (n / ((n-1)(n-2))) * Σ((x - mean) / SD)³ Args: db: Async database session website_id: Website identifier tryout_id: Tryout identifier Returns: Skewness value, or None if insufficient data """ result = await db.execute( select(TryoutStats).where( TryoutStats.website_id == website_id, TryoutStats.tryout_id == tryout_id, ) ) stats = result.scalar_one_or_none() if stats is None or stats.participant_count < 3: # Need at least 3 samples for skewness calculation return None n = stats.participant_count mean = stats.rataan sd = stats.sb if sd == 0: return 0.0 # All values are identical # Calculate skewness # We need individual NM values, which we don't have in TryoutStats # For now, return None as we need a different approach # This would require storing all NM values or calculating on-the-fly return None async def validate_dynamic_normalization( db: AsyncSession, website_id: int, tryout_id: str, target_mean: float = 500.0, target_sd: float = 100.0, mean_tolerance: float = 5.0, sd_tolerance: float = 5.0, ) -> Tuple[bool, dict]: """ Validate that dynamic normalization produces expected distribution. Checks if calculated rataan and sb are close to target values. Args: db: Async database session website_id: Website identifier tryout_id: Tryout identifier target_mean: Target mean (default: 500) target_sd: Target standard deviation (default: 100) mean_tolerance: Allowed deviation from target mean (default: 5) sd_tolerance: Allowed deviation from target SD (default: 5) Returns: Tuple of (is_valid, validation_details) validation_details contains: - participant_count: Number of participants - current_rataan: Current mean - current_sb: Current standard deviation - mean_deviation: Absolute deviation from target mean - sd_deviation: Absolute deviation from target SD - mean_within_tolerance: True if mean deviation < mean_tolerance - sd_within_tolerance: True if SD deviation < sd_tolerance - warnings: List of warning messages - suggestions: List of suggestions """ # Get current stats result = await db.execute( select(TryoutStats).where( TryoutStats.website_id == website_id, TryoutStats.tryout_id == tryout_id, ) ) stats = result.scalar_one_or_none() if stats is None or stats.rataan is None or stats.sb is None: return False, { "participant_count": 0, "current_rataan": None, "current_sb": None, "mean_deviation": None, "sd_deviation": None, "mean_within_tolerance": False, "sd_within_tolerance": False, "warnings": ["No statistics available for validation"], "suggestions": ["Wait for more participants to complete sessions"], } # Calculate deviations mean_deviation = abs(stats.rataan - target_mean) sd_deviation = abs(stats.sb - target_sd) # Check tolerance mean_within_tolerance = mean_deviation <= mean_tolerance sd_within_tolerance = sd_deviation <= sd_tolerance is_valid = mean_within_tolerance and sd_within_tolerance # Generate warnings warnings = [] suggestions = [] if not mean_within_tolerance: warnings.append(f"Mean deviation ({mean_deviation:.2f}) exceeds tolerance ({mean_tolerance})") if stats.rataan > target_mean: suggestions.append("Distribution may be right-skewed - consider checking question difficulty") else: suggestions.append("Distribution may be left-skewed - consider checking question difficulty") if not sd_within_tolerance: warnings.append(f"SD deviation ({sd_deviation:.2f}) exceeds tolerance ({sd_tolerance})") if stats.sb < target_sd: suggestions.append("SD too low - scores may be too tightly clustered") else: suggestions.append("SD too high - scores may have too much variance") # Check for skewness skewness = await calculate_skewness(db, website_id, tryout_id) if skewness is not None and abs(skewness) > 0.5: warnings.append(f"Distribution skewness ({skewness:.2f}) > 0.5 - distribution may be asymmetric") suggestions.append("Consider using static normalization if dynamic normalization is unstable") # Check participant count if stats.participant_count < 100: suggestions.append(f"Participant count ({stats.participant_count}) below recommended minimum (100)") return is_valid, { "participant_count": stats.participant_count, "current_rataan": stats.rataan, "current_sb": stats.sb, "mean_deviation": mean_deviation, "sd_deviation": sd_deviation, "mean_within_tolerance": mean_within_tolerance, "sd_within_tolerance": sd_within_tolerance, "warnings": warnings, "suggestions": suggestions, }