""" Reporting Service for IRT Bank Soal. Provides comprehensive reporting with 4 report types: - Student performance reports (individual + aggregate) - Item analysis reports (difficulty, discrimination, information functions) - Calibration status reports (progress tracking, readiness metrics) - Tryout comparison reports (across dates, across subjects) Export formats: CSV, Excel (.xlsx), PDF """ import io import math from datetime import datetime, timezone, timedelta from typing import Any, Dict, List, Literal, Optional, Union from dataclasses import dataclass, field import logging import pandas as pd from sqlalchemy import select, func, and_, or_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.models.item import Item from app.models.session import Session from app.models.tryout import Tryout from app.models.tryout_stats import TryoutStats from app.models.user_answer import UserAnswer logger = logging.getLogger(__name__) # ============================================================================= # Report Data Classes # ============================================================================= @dataclass class StudentPerformanceRecord: """Individual student performance record.""" session_id: str wp_user_id: str tryout_id: str NM: Optional[int] NN: Optional[int] theta: Optional[float] theta_se: Optional[float] total_benar: int time_spent: int # Total time in seconds start_time: Optional[datetime] end_time: Optional[datetime] scoring_mode_used: str rataan_used: Optional[float] sb_used: Optional[float] @dataclass class AggregatePerformanceStats: """Aggregate statistics for student performance.""" tryout_id: str participant_count: int avg_nm: Optional[float] std_nm: Optional[float] min_nm: Optional[int] max_nm: Optional[int] median_nm: Optional[float] avg_nn: Optional[float] std_nn: Optional[float] avg_theta: Optional[float] pass_rate: float # Percentage with NN >= 500 avg_time_spent: float # Average time in seconds @dataclass class StudentPerformanceReport: """Complete student performance report.""" generated_at: datetime tryout_id: str website_id: int date_range: Optional[Dict[str, datetime]] aggregate: AggregatePerformanceStats individual_records: List[StudentPerformanceRecord] @dataclass class ItemAnalysisRecord: """Item analysis record for a single item.""" item_id: int slot: int level: str ctt_p: Optional[float] ctt_bobot: Optional[float] ctt_category: Optional[str] irt_b: Optional[float] irt_se: Optional[float] calibrated: bool calibration_sample_size: int correctness_rate: float # Actual correctness from responses item_total_correlation: Optional[float] information_values: Dict[float, float] # theta -> information optimal_theta_range: str # e.g., "-1 to 0" @dataclass class ItemAnalysisReport: """Complete item analysis report.""" generated_at: datetime tryout_id: str website_id: int total_items: int items: List[ItemAnalysisRecord] summary: Dict[str, Any] @dataclass class CalibrationItemStatus: """Calibration status for a single item.""" item_id: int slot: int level: str sample_size: int calibrated: bool irt_b: Optional[float] irt_se: Optional[float] ctt_p: Optional[float] @dataclass class CalibrationStatusReport: """Complete calibration status report.""" generated_at: datetime tryout_id: str website_id: int total_items: int calibrated_items: int calibration_percentage: float items_awaiting_calibration: List[CalibrationItemStatus] avg_calibration_sample_size: float estimated_time_to_90_percent: Optional[str] ready_for_irt_rollout: bool items: List[CalibrationItemStatus] @dataclass class TryoutComparisonRecord: """Tryout comparison data point.""" tryout_id: str date: Optional[str] subject: Optional[str] participant_count: int avg_nm: Optional[float] avg_nn: Optional[float] avg_theta: Optional[float] std_nm: Optional[float] calibration_percentage: float @dataclass class TryoutComparisonReport: """Complete tryout comparison report.""" generated_at: datetime comparison_type: Literal["date", "subject"] tryouts: List[TryoutComparisonRecord] trends: Optional[Dict[str, Any]] normalization_impact: Optional[Dict[str, Any]] # ============================================================================= # Helper Functions # ============================================================================= def _calculate_item_information(theta: float, b: float) -> float: """ Calculate item information function at given theta for 1PL model. I(θ) = P(θ) * (1 - P(θ)) where P(θ) = 1 / (1 + e^-(θ-b)) """ exponent = theta - b exponent = max(-30, min(30, exponent)) p = 1.0 / (1.0 + math.exp(-exponent)) return p * (1 - p) def _calculate_item_total_correlation( item_responses: List[int], total_scores: List[int] ) -> Optional[float]: """ Calculate item-total correlation (point-biserial correlation). Returns None if insufficient data. """ if len(item_responses) < 5 or len(total_scores) < 5: return None n = len(item_responses) if n != len(total_scores): return None # Calculate means item_mean = sum(item_responses) / n total_mean = sum(total_scores) / n # Calculate standard deviations item_var = sum((x - item_mean) ** 2 for x in item_responses) / n total_var = sum((x - total_mean) ** 2 for x in total_scores) / n if item_var == 0 or total_var == 0: return None item_std = math.sqrt(item_var) total_std = math.sqrt(total_var) # Calculate correlation covariance = sum( (item_responses[i] - item_mean) * (total_scores[i] - total_mean) for i in range(n) ) / n correlation = covariance / (item_std * total_std) return round(correlation, 4) def _calculate_median(values: List[float]) -> Optional[float]: """Calculate median of a list of values.""" if not values: return None sorted_values = sorted(values) n = len(sorted_values) if n % 2 == 0: return (sorted_values[n // 2 - 1] + sorted_values[n // 2]) / 2 else: return sorted_values[n // 2] def _calculate_std(values: List[float]) -> Optional[float]: """Calculate standard deviation of a list of values.""" if not values or len(values) < 2: return None n = len(values) mean = sum(values) / n variance = sum((x - mean) ** 2 for x in values) / n return math.sqrt(variance) # ============================================================================= # Report Generation Functions # ============================================================================= async def generate_student_performance_report( tryout_id: str, website_id: int, db: AsyncSession, date_range: Optional[Dict[str, datetime]] = None, format_type: Literal["individual", "aggregate", "both"] = "both" ) -> StudentPerformanceReport: """ Generate student performance report. Args: tryout_id: Tryout identifier website_id: Website identifier db: Database session date_range: Optional date range filter {"start": datetime, "end": datetime} format_type: Report format - individual, aggregate, or both Returns: StudentPerformanceReport with aggregate stats and/or individual records """ # Build query for completed sessions query = ( select(Session) .where( Session.tryout_id == tryout_id, Session.website_id == website_id, Session.is_completed == True, ) ) # Apply date range filter if provided if date_range: if date_range.get("start"): query = query.where(Session.start_time >= date_range["start"]) if date_range.get("end"): query = query.where(Session.start_time <= date_range["end"]) query = query.order_by(Session.NN.desc().nullslast()) result = await db.execute(query) sessions = result.scalars().all() # Get total time spent for each session from user_answers individual_records = [] nm_values = [] nn_values = [] theta_values = [] time_spent_values = [] pass_count = 0 for session in sessions: # Calculate total time spent from user_answers time_result = await db.execute( select(func.sum(UserAnswer.time_spent)).where( UserAnswer.session_id == session.session_id ) ) total_time = time_result.scalar() or 0 record = StudentPerformanceRecord( session_id=session.session_id, wp_user_id=session.wp_user_id, tryout_id=session.tryout_id, NM=session.NM, NN=session.NN, theta=session.theta, theta_se=session.theta_se, total_benar=session.total_benar, time_spent=total_time, start_time=session.start_time, end_time=session.end_time, scoring_mode_used=session.scoring_mode_used, rataan_used=session.rataan_used, sb_used=session.sb_used, ) individual_records.append(record) # Collect statistics if session.NM is not None: nm_values.append(float(session.NM)) if session.NN is not None: nn_values.append(float(session.NN)) if session.NN >= 500: pass_count += 1 if session.theta is not None: theta_values.append(session.theta) time_spent_values.append(total_time) # Calculate aggregate statistics participant_count = len(sessions) pass_rate = (pass_count / participant_count * 100) if participant_count > 0 else 0.0 avg_time = sum(time_spent_values) / len(time_spent_values) if time_spent_values else 0.0 aggregate = AggregatePerformanceStats( tryout_id=tryout_id, participant_count=participant_count, avg_nm=sum(nm_values) / len(nm_values) if nm_values else None, std_nm=_calculate_std(nm_values), min_nm=int(min(nm_values)) if nm_values else None, max_nm=int(max(nm_values)) if nm_values else None, median_nm=_calculate_median(nm_values), avg_nn=sum(nn_values) / len(nn_values) if nn_values else None, std_nn=_calculate_std(nn_values), avg_theta=sum(theta_values) / len(theta_values) if theta_values else None, pass_rate=round(pass_rate, 2), avg_time_spent=round(avg_time, 2), ) return StudentPerformanceReport( generated_at=datetime.now(timezone.utc), tryout_id=tryout_id, website_id=website_id, date_range=date_range, aggregate=aggregate, individual_records=individual_records if format_type in ["individual", "both"] else [], ) async def generate_item_analysis_report( tryout_id: str, website_id: int, db: AsyncSession, filter_by: Optional[Literal["difficulty", "calibrated", "discrimination"]] = None, difficulty_level: Optional[Literal["mudah", "sedang", "sulit"]] = None ) -> ItemAnalysisReport: """ Generate item analysis report. Args: tryout_id: Tryout identifier website_id: Website identifier db: Database session filter_by: Optional filter - difficulty, calibrated, or discrimination difficulty_level: Filter by difficulty level if filter_by is "difficulty" Returns: ItemAnalysisReport with item difficulty, discrimination, and information """ # Get all items for this tryout query = ( select(Item) .where( Item.tryout_id == tryout_id, Item.website_id == website_id, ) .order_by(Item.slot) ) if filter_by == "difficulty" and difficulty_level: query = query.where(Item.level == difficulty_level) elif filter_by == "calibrated": query = query.where(Item.calibrated == True) result = await db.execute(query) items = result.scalars().all() item_records = [] theta_levels = [-3.0, -2.0, -1.0, 0.0, 1.0, 2.0, 3.0] for item in items: # Get correctness rate from actual responses resp_result = await db.execute( select( func.count().label("total"), func.sum(func.cast(UserAnswer.is_correct, type_=func.INTEGER)).label("correct") ).where(UserAnswer.item_id == item.id) ) resp_stats = resp_result.first() correctness_rate = 0.0 if resp_stats and resp_stats.total > 0: correctness_rate = (resp_stats.correct or 0) / resp_stats.total # Calculate item-total correlation # Get all responses for this item with session total scores correlation_result = await db.execute( select(UserAnswer, Session) .join(Session, UserAnswer.session_id == Session.session_id) .where( UserAnswer.item_id == item.id, Session.NN.isnot(None) ) ) correlation_data = correlation_result.all() item_responses = [] total_scores = [] for ua, sess in correlation_data: item_responses.append(1 if ua.is_correct else 0) total_scores.append(sess.NN or 0) item_total_corr = _calculate_item_total_correlation(item_responses, total_scores) # Calculate information values at different theta levels information_values = {} if item.irt_b is not None: for theta in theta_levels: information_values[theta] = round( _calculate_item_information(theta, item.irt_b), 4 ) # Determine optimal theta range (where information is highest) optimal_theta_range = "N/A" if information_values: max_info_theta = max(information_values.keys(), key=lambda t: information_values[t]) # For 1PL model, max information is at theta = b if item.irt_b is not None: b = item.irt_b if b < -1: optimal_theta_range = "-3 to -1" elif b < 0: optimal_theta_range = "-1 to 0" elif b < 1: optimal_theta_range = "0 to 1" else: optimal_theta_range = "1 to 3" record = ItemAnalysisRecord( item_id=item.id, slot=item.slot, level=item.level, ctt_p=round(item.ctt_p, 4) if item.ctt_p is not None else None, ctt_bobot=round(item.ctt_bobot, 4) if item.ctt_bobot is not None else None, ctt_category=item.ctt_category, irt_b=round(item.irt_b, 4) if item.irt_b is not None else None, irt_se=round(item.irt_se, 4) if item.irt_se is not None else None, calibrated=item.calibrated, calibration_sample_size=item.calibration_sample_size, correctness_rate=round(correctness_rate, 4), item_total_correlation=item_total_corr, information_values=information_values, optimal_theta_range=optimal_theta_range, ) item_records.append(record) # Apply discrimination filter if requested if filter_by == "discrimination": # Filter items with high discrimination (correlation > 0.3) item_records = [ r for r in item_records if r.item_total_correlation is not None and r.item_total_correlation > 0.3 ] # Calculate summary statistics avg_correctness = sum(r.correctness_rate for r in item_records) / len(item_records) if item_records else 0 calibrated_count = sum(1 for r in item_records if r.calibrated) high_discrimination = sum( 1 for r in item_records if r.item_total_correlation is not None and r.item_total_correlation > 0.3 ) summary = { "total_items": len(item_records), "calibrated_items": calibrated_count, "calibration_percentage": round(calibrated_count / len(item_records) * 100, 2) if item_records else 0, "avg_correctness_rate": round(avg_correctness, 4), "high_discrimination_items": high_discrimination, "difficulty_distribution": { "mudah": sum(1 for r in item_records if r.level == "mudah"), "sedang": sum(1 for r in item_records if r.level == "sedang"), "sulit": sum(1 for r in item_records if r.level == "sulit"), } } return ItemAnalysisReport( generated_at=datetime.now(timezone.utc), tryout_id=tryout_id, website_id=website_id, total_items=len(item_records), items=item_records, summary=summary, ) async def generate_calibration_status_report( tryout_id: str, website_id: int, db: AsyncSession ) -> CalibrationStatusReport: """ Generate calibration status report. Args: tryout_id: Tryout identifier website_id: Website identifier db: Database session Returns: CalibrationStatusReport with calibration progress and readiness """ # Get all items for this tryout result = await db.execute( select(Item) .where( Item.tryout_id == tryout_id, Item.website_id == website_id, ) .order_by(Item.slot) ) items = result.scalars().all() # Get tryout stats for response rate estimation stats_result = await db.execute( select(TryoutStats).where( TryoutStats.tryout_id == tryout_id, TryoutStats.website_id == website_id, ) ) stats = stats_result.scalar_one_or_none() # Get tryout config for min_calibration_sample tryout_result = await db.execute( select(Tryout).where( Tryout.tryout_id == tryout_id, Tryout.website_id == website_id, ) ) tryout = tryout_result.scalar_one_or_none() min_sample = tryout.min_calibration_sample if tryout else 500 item_statuses = [] items_awaiting = [] total_sample_size = 0 calibrated_count = 0 for item in items: status = CalibrationItemStatus( item_id=item.id, slot=item.slot, level=item.level, sample_size=item.calibration_sample_size, calibrated=item.calibrated, irt_b=round(item.irt_b, 4) if item.irt_b is not None else None, irt_se=round(item.irt_se, 4) if item.irt_se is not None else None, ctt_p=round(item.ctt_p, 4) if item.ctt_p is not None else None, ) item_statuses.append(status) total_sample_size += item.calibration_sample_size if item.calibrated: calibrated_count += 1 elif item.calibration_sample_size < min_sample: items_awaiting.append(status) total_items = len(items) calibration_percentage = (calibrated_count / total_items * 100) if total_items > 0 else 0 avg_sample_size = total_sample_size / total_items if total_items > 0 else 0 # Estimate time to reach 90% calibration estimated_time = None if stats and calibration_percentage < 90: # Calculate response rate (responses per day) if stats.last_calculated: days_since_start = max(1, (datetime.now(timezone.utc) - stats.last_calculated).days) response_rate = stats.participant_count / days_since_start if response_rate > 0: items_needed = int(total_items * 0.9) - calibrated_count responses_needed = items_needed * min_sample avg_responses_per_item = avg_sample_size if avg_sample_size > 0 else min_sample / 2 days_needed = responses_needed / (response_rate * avg_responses_per_item) if avg_responses_per_item > 0 else 0 estimated_time = f"{int(days_needed)} days" ready_for_irt = calibration_percentage >= 90 return CalibrationStatusReport( generated_at=datetime.now(timezone.utc), tryout_id=tryout_id, website_id=website_id, total_items=total_items, calibrated_items=calibrated_count, calibration_percentage=round(calibration_percentage, 2), items_awaiting_calibration=items_awaiting, avg_calibration_sample_size=round(avg_sample_size, 2), estimated_time_to_90_percent=estimated_time, ready_for_irt_rollout=ready_for_irt, items=item_statuses, ) async def generate_tryout_comparison_report( tryout_ids: List[str], website_id: int, db: AsyncSession, group_by: Literal["date", "subject"] = "date", date_ranges: Optional[List[Dict[str, datetime]]] = None ) -> TryoutComparisonReport: """ Generate tryout comparison report. Args: tryout_ids: List of tryout identifiers to compare website_id: Website identifier db: Database session group_by: Group by date or subject date_ranges: Optional date ranges for each tryout Returns: TryoutComparisonReport comparing tryouts """ comparison_records = [] normalization_impact = {} for i, tryout_id in enumerate(tryout_ids): # Get tryout stats stats_result = await db.execute( select(TryoutStats).where( TryoutStats.tryout_id == tryout_id, TryoutStats.website_id == website_id, ) ) stats = stats_result.scalar_one_or_none() # Get tryout config tryout_result = await db.execute( select(Tryout).where( Tryout.tryout_id == tryout_id, Tryout.website_id == website_id, ) ) tryout = tryout_result.scalar_one_or_none() # Get calibration percentage cal_result = await db.execute( select( func.count().label("total"), func.sum(func.cast(Item.calibrated, type_=func.INTEGER)).label("calibrated") ).where( Item.tryout_id == tryout_id, Item.website_id == website_id, ) ) cal_stats = cal_result.first() cal_percentage = 0.0 if cal_stats and cal_stats.total > 0: cal_percentage = (cal_stats.calibrated or 0) / cal_stats.total * 100 # Extract date/subject from tryout_id # Tryout ID format: "mat_sd_week1", "bahasa_sma_week1" date_str = None subject = None if group_by == "subject": # Extract subject from tryout_id (e.g., "mat_sd" -> "Matematika SD") parts = tryout_id.split("_") if len(parts) >= 2: subject = f"{parts[0].upper()} {parts[1].upper()}" else: # Use tryout creation date or extract from ID if tryout: date_str = tryout.created_at.strftime("%Y-%m-%d") record = TryoutComparisonRecord( tryout_id=tryout_id, date=date_str, subject=subject, participant_count=stats.participant_count if stats else 0, avg_nm=round(stats.rataan, 2) if stats and stats.rataan else None, avg_nn=round(stats.rataan + 500, 2) if stats and stats.rataan else None, avg_theta=None, # Would need to calculate from sessions std_nm=round(stats.sb, 2) if stats and stats.sb else None, calibration_percentage=round(cal_percentage, 2), ) comparison_records.append(record) # Track normalization impact if tryout: normalization_impact[tryout_id] = { "mode": tryout.normalization_mode, "static_rataan": tryout.static_rataan, "static_sb": tryout.static_sb, "dynamic_rataan": stats.rataan if stats else None, "dynamic_sb": stats.sb if stats else None, } # Calculate trends trends = None if group_by == "date" and len(comparison_records) > 1: sorted_records = sorted( [r for r in comparison_records if r.date], key=lambda x: x.date ) if len(sorted_records) > 1: first = sorted_records[0] last = sorted_records[-1] trends = { "nm_trend": "increasing" if (last.avg_nm or 0) > (first.avg_nm or 0) else "decreasing", "nm_change": round((last.avg_nm or 0) - (first.avg_nm or 0), 2), "calibration_trend": "improving" if last.calibration_percentage > first.calibration_percentage else "stable", } return TryoutComparisonReport( generated_at=datetime.now(timezone.utc), comparison_type=group_by, tryouts=comparison_records, trends=trends, normalization_impact=normalization_impact if normalization_impact else None, ) # ============================================================================= # Export Functions # ============================================================================= def export_report_to_csv(report_data: Union[StudentPerformanceReport, ItemAnalysisReport, CalibrationStatusReport, TryoutComparisonReport], filename: str) -> str: """ Export report data to CSV format. Args: report_data: Report data object filename: Base filename (without extension) Returns: Full path to generated CSV file """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") full_filename = f"{filename}_{timestamp}.csv" # Convert report to DataFrame based on type if isinstance(report_data, StudentPerformanceReport): # Individual records if report_data.individual_records: records = [ { "session_id": r.session_id, "wp_user_id": r.wp_user_id, "tryout_id": r.tryout_id, "NM": r.NM, "NN": r.NN, "theta": r.theta, "theta_se": r.theta_se, "total_benar": r.total_benar, "time_spent_seconds": r.time_spent, "start_time": r.start_time.isoformat() if r.start_time else None, "end_time": r.end_time.isoformat() if r.end_time else None, "scoring_mode": r.scoring_mode_used, } for r in report_data.individual_records ] df = pd.DataFrame(records) else: # Aggregate only df = pd.DataFrame([{ "tryout_id": report_data.aggregate.tryout_id, "participant_count": report_data.aggregate.participant_count, "avg_nm": report_data.aggregate.avg_nm, "std_nm": report_data.aggregate.std_nm, "min_nm": report_data.aggregate.min_nm, "max_nm": report_data.aggregate.max_nm, "median_nm": report_data.aggregate.median_nm, "avg_nn": report_data.aggregate.avg_nn, "std_nn": report_data.aggregate.std_nn, "avg_theta": report_data.aggregate.avg_theta, "pass_rate_percent": report_data.aggregate.pass_rate, "avg_time_spent_seconds": report_data.aggregate.avg_time_spent, }]) elif isinstance(report_data, ItemAnalysisReport): records = [ { "item_id": r.item_id, "slot": r.slot, "level": r.level, "ctt_p": r.ctt_p, "ctt_bobot": r.ctt_bobot, "ctt_category": r.ctt_category, "irt_b": r.irt_b, "irt_se": r.irt_se, "calibrated": r.calibrated, "sample_size": r.calibration_sample_size, "correctness_rate": r.correctness_rate, "item_total_correlation": r.item_total_correlation, "optimal_theta_range": r.optimal_theta_range, } for r in report_data.items ] df = pd.DataFrame(records) elif isinstance(report_data, CalibrationStatusReport): records = [ { "item_id": r.item_id, "slot": r.slot, "level": r.level, "sample_size": r.sample_size, "calibrated": r.calibrated, "irt_b": r.irt_b, "irt_se": r.irt_se, "ctt_p": r.ctt_p, } for r in report_data.items ] df = pd.DataFrame(records) elif isinstance(report_data, TryoutComparisonReport): records = [ { "tryout_id": r.tryout_id, "date": r.date, "subject": r.subject, "participant_count": r.participant_count, "avg_nm": r.avg_nm, "avg_nn": r.avg_nn, "avg_theta": r.avg_theta, "std_nm": r.std_nm, "calibration_percentage": r.calibration_percentage, } for r in report_data.tryouts ] df = pd.DataFrame(records) else: raise ValueError(f"Unsupported report type: {type(report_data)}") df.to_csv(full_filename, index=False) logger.info(f"Exported report to CSV: {full_filename}") return full_filename def export_report_to_excel(report_data: Union[StudentPerformanceReport, ItemAnalysisReport, CalibrationStatusReport, TryoutComparisonReport], filename: str) -> str: """ Export report data to Excel (.xlsx) format. Args: report_data: Report data object filename: Base filename (without extension) Returns: Full path to generated Excel file """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") full_filename = f"{filename}_{timestamp}.xlsx" with pd.ExcelWriter(full_filename, engine='openpyxl') as writer: if isinstance(report_data, StudentPerformanceReport): # Summary sheet summary_df = pd.DataFrame([{ "Report Generated": report_data.generated_at.isoformat(), "Tryout ID": report_data.tryout_id, "Website ID": report_data.website_id, "Participant Count": report_data.aggregate.participant_count, "Average NM": report_data.aggregate.avg_nm, "Std Dev NM": report_data.aggregate.std_nm, "Min NM": report_data.aggregate.min_nm, "Max NM": report_data.aggregate.max_nm, "Median NM": report_data.aggregate.median_nm, "Average NN": report_data.aggregate.avg_nn, "Std Dev NN": report_data.aggregate.std_nn, "Average Theta": report_data.aggregate.avg_theta, "Pass Rate (%)": report_data.aggregate.pass_rate, "Avg Time (seconds)": report_data.aggregate.avg_time_spent, }]) summary_df.to_excel(writer, sheet_name="Summary", index=False) # Individual records sheet if report_data.individual_records: records_df = pd.DataFrame([ { "Session ID": r.session_id, "User ID": r.wp_user_id, "NM": r.NM, "NN": r.NN, "Theta": r.theta, "Theta SE": r.theta_se, "Correct Answers": r.total_benar, "Time (seconds)": r.time_spent, "Start Time": r.start_time.isoformat() if r.start_time else None, "End Time": r.end_time.isoformat() if r.end_time else None, "Scoring Mode": r.scoring_mode_used, } for r in report_data.individual_records ]) records_df.to_excel(writer, sheet_name="Individual Records", index=False) elif isinstance(report_data, ItemAnalysisReport): # Summary sheet summary_df = pd.DataFrame([report_data.summary]) summary_df.to_excel(writer, sheet_name="Summary", index=False) # Items sheet items_df = pd.DataFrame([ { "Item ID": r.item_id, "Slot": r.slot, "Level": r.level, "CTT p": r.ctt_p, "CTT Bobot": r.ctt_bobot, "CTT Category": r.ctt_category, "IRT b": r.irt_b, "IRT SE": r.irt_se, "Calibrated": r.calibrated, "Sample Size": r.calibration_sample_size, "Correctness Rate": r.correctness_rate, "Item-Total Corr": r.item_total_correlation, "Optimal Theta Range": r.optimal_theta_range, } for r in report_data.items ]) items_df.to_excel(writer, sheet_name="Items", index=False) # Information functions sheet if report_data.items and report_data.items[0].information_values: info_records = [] for r in report_data.items: if r.information_values: for theta, info in r.information_values.items(): info_records.append({ "Item ID": r.item_id, "Slot": r.slot, "Theta": theta, "Information": info, }) if info_records: info_df = pd.DataFrame(info_records) info_df.to_excel(writer, sheet_name="Information Functions", index=False) elif isinstance(report_data, CalibrationStatusReport): # Summary sheet summary_df = pd.DataFrame([{ "Report Generated": report_data.generated_at.isoformat(), "Tryout ID": report_data.tryout_id, "Total Items": report_data.total_items, "Calibrated Items": report_data.calibrated_items, "Calibration %": report_data.calibration_percentage, "Avg Sample Size": report_data.avg_calibration_sample_size, "Est. Time to 90%": report_data.estimated_time_to_90_percent, "Ready for IRT": report_data.ready_for_irt_rollout, }]) summary_df.to_excel(writer, sheet_name="Summary", index=False) # Items awaiting calibration sheet if report_data.items_awaiting_calibration: awaiting_df = pd.DataFrame([ { "Item ID": r.item_id, "Slot": r.slot, "Level": r.level, "Sample Size": r.sample_size, "Calibrated": r.calibrated, "IRT b": r.irt_b, "CTT p": r.ctt_p, } for r in report_data.items_awaiting_calibration ]) awaiting_df.to_excel(writer, sheet_name="Awaiting Calibration", index=False) # All items sheet all_items_df = pd.DataFrame([ { "Item ID": r.item_id, "Slot": r.slot, "Level": r.level, "Sample Size": r.sample_size, "Calibrated": r.calibrated, "IRT b": r.irt_b, "IRT SE": r.irt_se, "CTT p": r.ctt_p, } for r in report_data.items ]) all_items_df.to_excel(writer, sheet_name="All Items", index=False) elif isinstance(report_data, TryoutComparisonReport): # Comparison sheet comparison_df = pd.DataFrame([ { "Tryout ID": r.tryout_id, "Date": r.date, "Subject": r.subject, "Participants": r.participant_count, "Avg NM": r.avg_nm, "Avg NN": r.avg_nn, "Avg Theta": r.avg_theta, "Std NM": r.std_nm, "Calibration %": r.calibration_percentage, } for r in report_data.tryouts ]) comparison_df.to_excel(writer, sheet_name="Comparison", index=False) # Trends sheet if report_data.trends: trends_df = pd.DataFrame([report_data.trends]) trends_df.to_excel(writer, sheet_name="Trends", index=False) # Normalization impact sheet if report_data.normalization_impact: norm_records = [] for tryout_id, impact in report_data.normalization_impact.items(): norm_records.append({ "Tryout ID": tryout_id, "Mode": impact.get("mode"), "Static Rataan": impact.get("static_rataan"), "Static SB": impact.get("static_sb"), "Dynamic Rataan": impact.get("dynamic_rataan"), "Dynamic SB": impact.get("dynamic_sb"), }) norm_df = pd.DataFrame(norm_records) norm_df.to_excel(writer, sheet_name="Normalization Impact", index=False) logger.info(f"Exported report to Excel: {full_filename}") return full_filename def export_report_to_pdf(report_data: Union[StudentPerformanceReport, ItemAnalysisReport, CalibrationStatusReport, TryoutComparisonReport], filename: str) -> str: """ Export report data to PDF format with tables and charts. Args: report_data: Report data object filename: Base filename (without extension) Returns: Full path to generated PDF file """ from reportlab.lib import colors from reportlab.lib.pagesizes import letter, A4 from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import inch from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, PageBreak from reportlab.lib.enums import TA_CENTER, TA_LEFT timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") full_filename = f"{filename}_{timestamp}.pdf" doc = SimpleDocTemplate(full_filename, pagesize=A4) styles = getSampleStyleSheet() # Custom styles title_style = ParagraphStyle( 'CustomTitle', parent=styles['Heading1'], fontSize=16, alignment=TA_CENTER, spaceAfter=20, ) heading_style = ParagraphStyle( 'CustomHeading', parent=styles['Heading2'], fontSize=12, spaceAfter=10, ) elements = [] # Title title = "Report" if isinstance(report_data, StudentPerformanceReport): title = f"Student Performance Report - {report_data.tryout_id}" elif isinstance(report_data, ItemAnalysisReport): title = f"Item Analysis Report - {report_data.tryout_id}" elif isinstance(report_data, CalibrationStatusReport): title = f"Calibration Status Report - {report_data.tryout_id}" elif isinstance(report_data, TryoutComparisonReport): title = "Tryout Comparison Report" elements.append(Paragraph(title, title_style)) elements.append(Paragraph(f"Generated: {report_data.generated_at.strftime('%Y-%m-%d %H:%M:%S UTC')}", styles['Normal'])) elements.append(Spacer(1, 20)) if isinstance(report_data, StudentPerformanceReport): # Summary table elements.append(Paragraph("Summary Statistics", heading_style)) summary_data = [ ["Metric", "Value"], ["Participant Count", str(report_data.aggregate.participant_count)], ["Average NM", str(report_data.aggregate.avg_nm or "N/A")], ["Std Dev NM", str(report_data.aggregate.std_nm or "N/A")], ["Min NM", str(report_data.aggregate.min_nm or "N/A")], ["Max NM", str(report_data.aggregate.max_nm or "N/A")], ["Median NM", str(report_data.aggregate.median_nm or "N/A")], ["Average NN", str(report_data.aggregate.avg_nn or "N/A")], ["Pass Rate", f"{report_data.aggregate.pass_rate}%"], ["Avg Time (min)", f"{report_data.aggregate.avg_time_spent / 60:.1f}"], ] summary_table = Table(summary_data, colWidths=[2*inch, 2*inch]) summary_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 10), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.beige), ('GRID', (0, 0), (-1, -1), 1, colors.black), ])) elements.append(summary_table) # Individual records (first 20) if report_data.individual_records: elements.append(Spacer(1, 20)) elements.append(Paragraph("Individual Records (Top 20)", heading_style)) records_data = [["User ID", "NM", "NN", "Correct", "Time (min)"]] for r in report_data.individual_records[:20]: records_data.append([ r.wp_user_id[:15] + "..." if len(r.wp_user_id) > 15 else r.wp_user_id, str(r.NM or "N/A"), str(r.NN or "N/A"), str(r.total_benar), f"{r.time_spent / 60:.1f}", ]) records_table = Table(records_data, colWidths=[1.5*inch, 0.8*inch, 0.8*inch, 0.8*inch, 1*inch]) records_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, -1), 8), ('BOTTOMPADDING', (0, 0), (-1, 0), 8), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(records_table) elif isinstance(report_data, ItemAnalysisReport): # Summary elements.append(Paragraph("Item Analysis Summary", heading_style)) summary_data = [ ["Metric", "Value"], ["Total Items", str(report_data.summary.get("total_items", 0))], ["Calibrated Items", str(report_data.summary.get("calibrated_items", 0))], ["Calibration %", f"{report_data.summary.get('calibration_percentage', 0)}%"], ["Avg Correctness", f"{report_data.summary.get('avg_correctness_rate', 0):.2%}"], ["High Discrimination", str(report_data.summary.get("high_discrimination_items", 0))], ] summary_table = Table(summary_data, colWidths=[2*inch, 2*inch]) summary_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 10), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.beige), ('GRID', (0, 0), (-1, -1), 1, colors.black), ])) elements.append(summary_table) # Items table (first 25) elements.append(Spacer(1, 20)) elements.append(Paragraph("Items (First 25)", heading_style)) items_data = [["Slot", "Level", "CTT p", "IRT b", "Calibrated", "Corr Rate"]] for r in report_data.items[:25]: items_data.append([ str(r.slot), r.level, f"{r.ctt_p:.2f}" if r.ctt_p else "N/A", f"{r.irt_b:.2f}" if r.irt_b else "N/A", "Yes" if r.calibrated else "No", f"{r.correctness_rate:.2%}", ]) items_table = Table(items_data, colWidths=[0.6*inch, 0.8*inch, 0.8*inch, 0.8*inch, 1*inch, 0.9*inch]) items_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, -1), 8), ('BOTTOMPADDING', (0, 0), (-1, 0), 8), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(items_table) elif isinstance(report_data, CalibrationStatusReport): # Summary elements.append(Paragraph("Calibration Status Summary", heading_style)) summary_data = [ ["Metric", "Value"], ["Total Items", str(report_data.total_items)], ["Calibrated Items", str(report_data.calibrated_items)], ["Calibration %", f"{report_data.calibration_percentage}%"], ["Avg Sample Size", f"{report_data.avg_calibration_sample_size:.0f}"], ["Est. Time to 90%", report_data.estimated_time_to_90_percent or "N/A"], ["Ready for IRT", "Yes" if report_data.ready_for_irt_rollout else "No"], ] summary_table = Table(summary_data, colWidths=[2*inch, 2*inch]) summary_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 10), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.beige), ('GRID', (0, 0), (-1, -1), 1, colors.black), ])) elements.append(summary_table) # Items awaiting calibration if report_data.items_awaiting_calibration: elements.append(Spacer(1, 20)) elements.append(Paragraph(f"Items Awaiting Calibration ({len(report_data.items_awaiting_calibration)})", heading_style)) await_data = [["Slot", "Level", "Sample Size", "CTT p", "IRT b"]] for r in report_data.items_awaiting_calibration[:25]: await_data.append([ str(r.slot), r.level, str(r.sample_size), f"{r.ctt_p:.2f}" if r.ctt_p else "N/A", f"{r.irt_b:.2f}" if r.irt_b else "N/A", ]) await_table = Table(await_data, colWidths=[0.8*inch, 0.8*inch, 1.2*inch, 0.8*inch, 0.8*inch]) await_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, -1), 8), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(await_table) elif isinstance(report_data, TryoutComparisonReport): # Comparison table elements.append(Paragraph("Tryout Comparison", heading_style)) comp_data = [["Tryout ID", "Participants", "Avg NM", "Avg NN", "Calib %"]] for r in report_data.tryouts: comp_data.append([ r.tryout_id[:20], str(r.participant_count), f"{r.avg_nm:.1f}" if r.avg_nm else "N/A", f"{r.avg_nn:.1f}" if r.avg_nn else "N/A", f"{r.calibration_percentage:.1f}%", ]) comp_table = Table(comp_data, colWidths=[1.5*inch, 1*inch, 1*inch, 1*inch, 1*inch]) comp_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, -1), 9), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(comp_table) # Trends if report_data.trends: elements.append(Spacer(1, 20)) elements.append(Paragraph("Trends Analysis", heading_style)) trends_data = [["Metric", "Value"]] for key, value in report_data.trends.items(): trends_data.append([key.replace("_", " ").title(), str(value)]) trends_table = Table(trends_data, colWidths=[2*inch, 2*inch]) trends_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, -1), 9), ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), ])) elements.append(trends_table) # Build PDF doc.build(elements) logger.info(f"Exported report to PDF: {full_filename}") return full_filename # ============================================================================= # Report Scheduling Models (for future Celery/APScheduler integration) # ============================================================================= @dataclass class ReportSchedule: """Report schedule configuration.""" schedule_id: str report_type: Literal["student_performance", "item_analysis", "calibration_status", "tryout_comparison"] schedule: Literal["daily", "weekly", "monthly"] tryout_ids: List[str] website_id: int recipients: List[str] format: Literal["csv", "xlsx", "pdf"] = "xlsx" created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) last_run: Optional[datetime] = None next_run: Optional[datetime] = None is_active: bool = True # In-memory store for scheduled reports (in production, use database) _scheduled_reports: Dict[str, ReportSchedule] = {} def schedule_report( report_type: Literal["student_performance", "item_analysis", "calibration_status", "tryout_comparison"], schedule: Literal["daily", "weekly", "monthly"], tryout_ids: List[str], website_id: int, recipients: List[str], export_format: Literal["csv", "xlsx", "pdf"] = "xlsx" ) -> str: """ Schedule a report for automatic generation. Args: report_type: Type of report to generate schedule: Schedule frequency tryout_ids: List of tryout IDs for the report website_id: Website identifier recipients: List of email addresses to send report to export_format: Export format for the report Returns: Schedule ID """ import uuid schedule_id = str(uuid.uuid4()) # Calculate next run time now = datetime.now(timezone.utc) if schedule == "daily": next_run = now + timedelta(days=1) elif schedule == "weekly": next_run = now + timedelta(weeks=1) else: # monthly next_run = now + timedelta(days=30) report_schedule = ReportSchedule( schedule_id=schedule_id, report_type=report_type, schedule=schedule, tryout_ids=tryout_ids, website_id=website_id, recipients=recipients, format=export_format, next_run=next_run, ) _scheduled_reports[schedule_id] = report_schedule logger.info(f"Scheduled report {schedule_id}: {report_type} {schedule}") return schedule_id def get_scheduled_report(schedule_id: str) -> Optional[ReportSchedule]: """Get a scheduled report by ID.""" return _scheduled_reports.get(schedule_id) def list_scheduled_reports(website_id: Optional[int] = None) -> List[ReportSchedule]: """List all scheduled reports, optionally filtered by website.""" reports = list(_scheduled_reports.values()) if website_id: reports = [r for r in reports if r.website_id == website_id] return reports def cancel_scheduled_report(schedule_id: str) -> bool: """Cancel a scheduled report.""" if schedule_id in _scheduled_reports: del _scheduled_reports[schedule_id] logger.info(f"Cancelled scheduled report {schedule_id}") return True return False # Export public API __all__ = [ # Report generation functions "generate_student_performance_report", "generate_item_analysis_report", "generate_calibration_status_report", "generate_tryout_comparison_report", # Export functions "export_report_to_csv", "export_report_to_excel", "export_report_to_pdf", # Report data classes "StudentPerformanceReport", "StudentPerformanceRecord", "AggregatePerformanceStats", "ItemAnalysisReport", "ItemAnalysisRecord", "CalibrationStatusReport", "CalibrationItemStatus", "TryoutComparisonReport", "TryoutComparisonRecord", # Scheduling "ReportSchedule", "schedule_report", "get_scheduled_report", "list_scheduled_reports", "cancel_scheduled_report", ]