""" Excel Import/Export Service for Question Migration. Handles import from standardized Excel format with: - Row 2: KUNCI (answer key) - Row 4: TK (tingkat kesukaran p-value) - Row 5: BOBOT (weight 1-p) - Rows 6+: Individual question data Ensures 100% data integrity with comprehensive validation. """ import os from datetime import datetime from typing import Any, Dict, List, Optional import openpyxl from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.item import Item from app.services.ctt_scoring import ( convert_ctt_p_to_irt_b, categorize_difficulty, ) def validate_excel_structure(file_path: str) -> Dict[str, Any]: """ Validate Excel file structure against required format. Checks: - File exists and is valid Excel (.xlsx) - Sheet "CONTOH" exists - Required rows exist (Row 2 KUNCI, Row 4 TK, Row 5 BOBOT) - Question data rows have required columns Args: file_path: Path to Excel file Returns: Dict with: - valid: bool - Whether structure is valid - errors: List[str] - Validation errors if any """ errors: List[str] = [] # Check file exists if not os.path.exists(file_path): return {"valid": False, "errors": [f"File not found: {file_path}"]} # Check file extension if not file_path.lower().endswith('.xlsx'): return {"valid": False, "errors": ["File must be .xlsx format"]} try: wb = openpyxl.load_workbook(file_path, data_only=False) except Exception as e: return {"valid": False, "errors": [f"Failed to load Excel file: {str(e)}"]} # Check sheet "CONTOH" exists if "CONTOH" not in wb.sheetnames: return { "valid": False, "errors": ['Sheet "CONTOH" not found. Available sheets: ' + ", ".join(wb.sheetnames)] } ws = wb["CONTOH"] # Check minimum rows exist if ws.max_row < 6: errors.append(f"Excel file must have at least 6 rows (found {ws.max_row})") # Check Row 2 exists (KUNCI) if ws.max_row < 2: errors.append("Row 2 (KUNCI - answer key) is required") # Check Row 4 exists (TK - p-values) if ws.max_row < 4: errors.append("Row 4 (TK - p-values) is required") # Check Row 5 exists (BOBOT - weights) if ws.max_row < 5: errors.append("Row 5 (BOBOT - weights) is required") # Check question data rows exist (6+) if ws.max_row < 6: errors.append("Question data rows (6+) are required") # Check minimum columns (at least slot, level, soal_text, options, correct_answer) if ws.max_column < 8: errors.append( f"Excel file must have at least 8 columns (found {ws.max_column}). " "Expected: slot, level, soal_text, options_A, options_B, options_C, options_D, correct_answer" ) # Check KUNCI row has values if ws.max_row >= 2: kunce_row_values = [ws.cell(2, col).value for col in range(4, ws.max_column + 1)] if not any(v for v in kunce_row_values if v and v != "KUNCI"): errors.append("Row 2 (KUNCI) must contain answer key values") # Check TK row has numeric values if ws.max_row >= 4: wb_data = openpyxl.load_workbook(file_path, data_only=True) ws_data = wb_data["CONTOH"] tk_row_values = [ws_data.cell(4, col).value for col in range(4, ws.max_column + 1)] if not any(v for v in tk_row_values if isinstance(v, (int, float))): errors.append("Row 4 (TK) must contain numeric p-values") # Check BOBOT row has numeric values if ws.max_row >= 5: wb_data = openpyxl.load_workbook(file_path, data_only=True) ws_data = wb_data["CONTOH"] bobot_row_values = [ws_data.cell(5, col).value for col in range(4, ws.max_column + 1)] if not any(v for v in bobot_row_values if isinstance(v, (int, float))): errors.append("Row 5 (BOBOT) must contain numeric weight values") return {"valid": len(errors) == 0, "errors": errors} def parse_excel_import( file_path: str, website_id: int, tryout_id: str ) -> Dict[str, Any]: """ Parse Excel file and extract items with full validation. Excel structure: - Sheet name: "CONTOH" - Row 2: KUNCI (answer key) - extract correct answers per slot - Row 4: TK (tingkat kesukaran p-value) - extract p-values per slot - Row 5: BOBOT (weight 1-p) - extract bobot per slot - Rows 6+: Individual question data Args: file_path: Path to Excel file website_id: Website identifier tryout_id: Tryout identifier Returns: Dict with: - items: List[Dict] - Parsed items ready for database - validation_errors: List[str] - Any validation errors - items_count: int - Number of items parsed """ # First validate structure validation = validate_excel_structure(file_path) if not validation["valid"]: return { "items": [], "validation_errors": validation["errors"], "items_count": 0 } items: List[Dict[str, Any]] = [] errors: List[str] = [] try: # Load workbook twice: once with formulas, once with data_only wb = openpyxl.load_workbook(file_path, data_only=False) ws = wb["CONTOH"] wb_data = openpyxl.load_workbook(file_path, data_only=True) ws_data = wb_data["CONTOH"] # Extract answer key from Row 2 answer_key: Dict[int, str] = {} for col in range(4, ws.max_column + 1): key_cell = ws.cell(2, col).value if key_cell and key_cell != "KUNCI": slot_num = col - 3 # Column 4 -> slot 1 answer_key[slot_num] = str(key_cell).strip().upper() # Extract p-values from Row 4 p_values: Dict[int, float] = {} for col in range(4, ws.max_column + 1): slot_num = col - 3 if slot_num in answer_key: p_cell = ws_data.cell(4, col).value if p_cell and isinstance(p_cell, (int, float)): p_values[slot_num] = float(p_cell) # Extract bobot from Row 5 bobot_values: Dict[int, float] = {} for col in range(4, ws.max_column + 1): slot_num = col - 3 if slot_num in answer_key: bobot_cell = ws_data.cell(5, col).value if bobot_cell and isinstance(bobot_cell, (int, float)): bobot_values[slot_num] = float(bobot_cell) # Parse question data rows (6+) for row_idx in range(6, ws.max_row + 1): # Column mapping (based on project-brief): # Column 1 (A): slot (question number) # Column 2 (B): level (mudah/sedang/sulit) # Column 3 (C): soal_text (question stem) # Column 4 (D): options_A # Column 5 (E): options_B # Column 6 (F): options_C # Column 7 (G): options_D # Column 8 (H): correct_answer slot_cell = ws.cell(row_idx, 1).value level_cell = ws.cell(row_idx, 2).value soal_text_cell = ws.cell(row_idx, 3).value option_a = ws.cell(row_idx, 4).value option_b = ws.cell(row_idx, 5).value option_c = ws.cell(row_idx, 6).value option_d = ws.cell(row_idx, 7).value correct_cell = ws.cell(row_idx, 8).value # Skip empty rows if not slot_cell and not soal_text_cell: continue # Validate required fields if not slot_cell: errors.append(f"Row {row_idx}: Missing slot value") continue slot_num = int(slot_cell) if isinstance(slot_cell, (int, float)) else None if slot_num is None: try: slot_num = int(str(slot_cell).strip()) except (ValueError, AttributeError): errors.append(f"Row {row_idx}: Invalid slot value: {slot_cell}") continue # Get or infer level if not level_cell: # Use p-value from Row 4 to determine level p_val = p_values.get(slot_num, 0.5) level_val = categorize_difficulty(p_val) else: level_val = str(level_cell).strip().lower() if level_val not in ["mudah", "sedang", "sulit"]: errors.append( f"Row {row_idx}: Invalid level '{level_cell}'. Must be 'mudah', 'sedang', or 'sulit'" ) continue # Validate soal_text if not soal_text_cell: errors.append(f"Row {row_idx} (slot {slot_num}): Missing soal_text (question stem)") continue # Build options JSON options: Dict[str, str] = {} if option_a: options["A"] = str(option_a).strip() if option_b: options["B"] = str(option_b).strip() if option_c: options["C"] = str(option_c).strip() if option_d: options["D"] = str(option_d).strip() if len(options) < 4: errors.append( f"Row {row_idx} (slot {slot_num}): Missing options. Expected 4 options (A, B, C, D)" ) continue # Get correct answer if not correct_cell: # Fall back to answer key from Row 2 correct_ans = answer_key.get(slot_num) if not correct_ans: errors.append( f"Row {row_idx} (slot {slot_num}): Missing correct_answer and no answer key found" ) continue else: correct_ans = str(correct_cell).strip().upper() if correct_ans not in ["A", "B", "C", "D"]: errors.append( f"Row {row_idx} (slot {slot_num}): Invalid correct_answer '{correct_ans}'. Must be A, B, C, or D" ) continue # Get CTT parameters p_val = p_values.get(slot_num, 0.5) bobot_val = bobot_values.get(slot_num, 1.0 - p_val) # Validate p-value range if p_val < 0 or p_val > 1: errors.append( f"Slot {slot_num}: Invalid p-value {p_val}. Must be in range [0, 1]" ) continue # Validate bobot range if bobot_val < 0 or bobot_val > 1: errors.append( f"Slot {slot_num}: Invalid bobot {bobot_val}. Must be in range [0, 1]" ) continue # Calculate CTT category and IRT b parameter ctt_cat = categorize_difficulty(p_val) irt_b = convert_ctt_p_to_irt_b(p_val) # Build item dict item = { "tryout_id": tryout_id, "website_id": website_id, "slot": slot_num, "level": level_val, "stem": str(soal_text_cell).strip(), "options": options, "correct_answer": correct_ans, "explanation": None, "ctt_p": p_val, "ctt_bobot": bobot_val, "ctt_category": ctt_cat, "irt_b": irt_b, "irt_se": None, "calibrated": False, "calibration_sample_size": 0, "generated_by": "manual", "ai_model": None, "basis_item_id": None, } items.append(item) return { "items": items, "validation_errors": errors, "items_count": len(items) } except Exception as e: return { "items": [], "validation_errors": [f"Parsing error: {str(e)}"], "items_count": 0 } async def bulk_insert_items( items_list: List[Dict[str, Any]], db: AsyncSession ) -> Dict[str, Any]: """ Bulk insert items with duplicate detection. Skips duplicates based on (tryout_id, website_id, slot). Args: items_list: List of item dictionaries to insert db: Async SQLAlchemy database session Returns: Dict with: - inserted_count: int - Number of items inserted - duplicate_count: int - Number of duplicates skipped - errors: List[str] - Any errors during insertion """ inserted_count = 0 duplicate_count = 0 errors: List[str] = [] try: for item_data in items_list: # Check for duplicate result = await db.execute( select(Item).where( Item.tryout_id == item_data["tryout_id"], Item.website_id == item_data["website_id"], Item.slot == item_data["slot"] ) ) existing = result.scalar_one_or_none() if existing: duplicate_count += 1 continue # Create new item item = Item(**item_data) db.add(item) inserted_count += 1 # Commit all inserts await db.commit() return { "inserted_count": inserted_count, "duplicate_count": duplicate_count, "errors": errors } except Exception as e: await db.rollback() return { "inserted_count": 0, "duplicate_count": duplicate_count, "errors": [f"Insertion failed: {str(e)}"] } async def export_questions_to_excel( tryout_id: str, website_id: int, db: AsyncSession, output_path: Optional[str] = None ) -> str: """ Export questions to Excel in standardized format. Creates Excel workbook with: - Sheet "CONTOH" - Row 2: KUNCI (answer key) - Row 4: TK (p-values) - Row 5: BOBOT (weights) - Rows 6+: Question data Args: tryout_id: Tryout identifier website_id: Website identifier db: Async SQLAlchemy database session output_path: Optional output file path. If not provided, generates temp file. Returns: Path to exported Excel file """ # Fetch all items for this tryout result = await db.execute( select(Item).filter( Item.tryout_id == tryout_id, Item.website_id == website_id ).order_by(Item.slot) ) items = result.scalars().all() if not items: raise ValueError(f"No items found for tryout_id={tryout_id}, website_id={website_id}") # Create workbook wb = openpyxl.Workbook() ws = wb.active ws.title = "CONTOH" # Determine max slot for column sizing max_slot = max(item.slot for item in items) # Row 1: Header ws.cell(1, 1, "No") ws.cell(1, 2, "Level") ws.cell(1, 3, "Soal") for slot_idx in range(max_slot): col = slot_idx + 4 ws.cell(1, col, f"Soal {slot_idx + 1}") # Row 2: KUNCI (answer key) ws.cell(2, 1, "") ws.cell(2, 2, "") ws.cell(2, 3, "KUNCI") for item in items: col = item.slot + 3 ws.cell(2, col, item.correct_answer) # Row 3: Empty ws.cell(3, 1, "") ws.cell(3, 2, "") ws.cell(3, 3, "") # Row 4: TK (p-values) ws.cell(4, 1, "") ws.cell(4, 2, "") ws.cell(4, 3, "TK") for item in items: col = item.slot + 3 ws.cell(4, col, item.ctt_p or 0.5) # Row 5: BOBOT (weights) ws.cell(5, 1, "") ws.cell(5, 2, "") ws.cell(5, 3, "BOBOT") for item in items: col = item.slot + 3 ws.cell(5, col, item.ctt_bobot or (1.0 - (item.ctt_p or 0.5))) # Rows 6+: Question data row_idx = 6 for item in items: # Column 1: Slot number ws.cell(row_idx, 1, item.slot) # Column 2: Level ws.cell(row_idx, 2, item.level) # Column 3: Soal text (stem) ws.cell(row_idx, 3, item.stem) # Columns 4+: Options options = item.options or {} ws.cell(row_idx, 4, options.get("A", "")) ws.cell(row_idx, 5, options.get("B", "")) ws.cell(row_idx, 6, options.get("C", "")) ws.cell(row_idx, 7, options.get("D", "")) # Column 8: Correct answer ws.cell(row_idx, 8, item.correct_answer) row_idx += 1 # Generate output path if not provided if output_path is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = f"/tmp/tryout_{tryout_id}_export_{timestamp}.xlsx" # Save workbook wb.save(output_path) return output_path