Files
yellow-bank-soal/app/services/excel_import.py
Dwindi Ramadhana cf193d7ea0 first commit
2026-03-21 23:32:59 +07:00

522 lines
16 KiB
Python

"""
Excel Import/Export Service for Question Migration.
Handles import from standardized Excel format with:
- Row 2: KUNCI (answer key)
- Row 4: TK (tingkat kesukaran p-value)
- Row 5: BOBOT (weight 1-p)
- Rows 6+: Individual question data
Ensures 100% data integrity with comprehensive validation.
"""
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
import openpyxl
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.item import Item
from app.services.ctt_scoring import (
convert_ctt_p_to_irt_b,
categorize_difficulty,
)
def validate_excel_structure(file_path: str) -> Dict[str, Any]:
"""
Validate Excel file structure against required format.
Checks:
- File exists and is valid Excel (.xlsx)
- Sheet "CONTOH" exists
- Required rows exist (Row 2 KUNCI, Row 4 TK, Row 5 BOBOT)
- Question data rows have required columns
Args:
file_path: Path to Excel file
Returns:
Dict with:
- valid: bool - Whether structure is valid
- errors: List[str] - Validation errors if any
"""
errors: List[str] = []
# Check file exists
if not os.path.exists(file_path):
return {"valid": False, "errors": [f"File not found: {file_path}"]}
# Check file extension
if not file_path.lower().endswith('.xlsx'):
return {"valid": False, "errors": ["File must be .xlsx format"]}
try:
wb = openpyxl.load_workbook(file_path, data_only=False)
except Exception as e:
return {"valid": False, "errors": [f"Failed to load Excel file: {str(e)}"]}
# Check sheet "CONTOH" exists
if "CONTOH" not in wb.sheetnames:
return {
"valid": False,
"errors": ['Sheet "CONTOH" not found. Available sheets: ' + ", ".join(wb.sheetnames)]
}
ws = wb["CONTOH"]
# Check minimum rows exist
if ws.max_row < 6:
errors.append(f"Excel file must have at least 6 rows (found {ws.max_row})")
# Check Row 2 exists (KUNCI)
if ws.max_row < 2:
errors.append("Row 2 (KUNCI - answer key) is required")
# Check Row 4 exists (TK - p-values)
if ws.max_row < 4:
errors.append("Row 4 (TK - p-values) is required")
# Check Row 5 exists (BOBOT - weights)
if ws.max_row < 5:
errors.append("Row 5 (BOBOT - weights) is required")
# Check question data rows exist (6+)
if ws.max_row < 6:
errors.append("Question data rows (6+) are required")
# Check minimum columns (at least slot, level, soal_text, options, correct_answer)
if ws.max_column < 8:
errors.append(
f"Excel file must have at least 8 columns (found {ws.max_column}). "
"Expected: slot, level, soal_text, options_A, options_B, options_C, options_D, correct_answer"
)
# Check KUNCI row has values
if ws.max_row >= 2:
kunce_row_values = [ws.cell(2, col).value for col in range(4, ws.max_column + 1)]
if not any(v for v in kunce_row_values if v and v != "KUNCI"):
errors.append("Row 2 (KUNCI) must contain answer key values")
# Check TK row has numeric values
if ws.max_row >= 4:
wb_data = openpyxl.load_workbook(file_path, data_only=True)
ws_data = wb_data["CONTOH"]
tk_row_values = [ws_data.cell(4, col).value for col in range(4, ws.max_column + 1)]
if not any(v for v in tk_row_values if isinstance(v, (int, float))):
errors.append("Row 4 (TK) must contain numeric p-values")
# Check BOBOT row has numeric values
if ws.max_row >= 5:
wb_data = openpyxl.load_workbook(file_path, data_only=True)
ws_data = wb_data["CONTOH"]
bobot_row_values = [ws_data.cell(5, col).value for col in range(4, ws.max_column + 1)]
if not any(v for v in bobot_row_values if isinstance(v, (int, float))):
errors.append("Row 5 (BOBOT) must contain numeric weight values")
return {"valid": len(errors) == 0, "errors": errors}
def parse_excel_import(
file_path: str,
website_id: int,
tryout_id: str
) -> Dict[str, Any]:
"""
Parse Excel file and extract items with full validation.
Excel structure:
- Sheet name: "CONTOH"
- Row 2: KUNCI (answer key) - extract correct answers per slot
- Row 4: TK (tingkat kesukaran p-value) - extract p-values per slot
- Row 5: BOBOT (weight 1-p) - extract bobot per slot
- Rows 6+: Individual question data
Args:
file_path: Path to Excel file
website_id: Website identifier
tryout_id: Tryout identifier
Returns:
Dict with:
- items: List[Dict] - Parsed items ready for database
- validation_errors: List[str] - Any validation errors
- items_count: int - Number of items parsed
"""
# First validate structure
validation = validate_excel_structure(file_path)
if not validation["valid"]:
return {
"items": [],
"validation_errors": validation["errors"],
"items_count": 0
}
items: List[Dict[str, Any]] = []
errors: List[str] = []
try:
# Load workbook twice: once with formulas, once with data_only
wb = openpyxl.load_workbook(file_path, data_only=False)
ws = wb["CONTOH"]
wb_data = openpyxl.load_workbook(file_path, data_only=True)
ws_data = wb_data["CONTOH"]
# Extract answer key from Row 2
answer_key: Dict[int, str] = {}
for col in range(4, ws.max_column + 1):
key_cell = ws.cell(2, col).value
if key_cell and key_cell != "KUNCI":
slot_num = col - 3 # Column 4 -> slot 1
answer_key[slot_num] = str(key_cell).strip().upper()
# Extract p-values from Row 4
p_values: Dict[int, float] = {}
for col in range(4, ws.max_column + 1):
slot_num = col - 3
if slot_num in answer_key:
p_cell = ws_data.cell(4, col).value
if p_cell and isinstance(p_cell, (int, float)):
p_values[slot_num] = float(p_cell)
# Extract bobot from Row 5
bobot_values: Dict[int, float] = {}
for col in range(4, ws.max_column + 1):
slot_num = col - 3
if slot_num in answer_key:
bobot_cell = ws_data.cell(5, col).value
if bobot_cell and isinstance(bobot_cell, (int, float)):
bobot_values[slot_num] = float(bobot_cell)
# Parse question data rows (6+)
for row_idx in range(6, ws.max_row + 1):
# Column mapping (based on project-brief):
# Column 1 (A): slot (question number)
# Column 2 (B): level (mudah/sedang/sulit)
# Column 3 (C): soal_text (question stem)
# Column 4 (D): options_A
# Column 5 (E): options_B
# Column 6 (F): options_C
# Column 7 (G): options_D
# Column 8 (H): correct_answer
slot_cell = ws.cell(row_idx, 1).value
level_cell = ws.cell(row_idx, 2).value
soal_text_cell = ws.cell(row_idx, 3).value
option_a = ws.cell(row_idx, 4).value
option_b = ws.cell(row_idx, 5).value
option_c = ws.cell(row_idx, 6).value
option_d = ws.cell(row_idx, 7).value
correct_cell = ws.cell(row_idx, 8).value
# Skip empty rows
if not slot_cell and not soal_text_cell:
continue
# Validate required fields
if not slot_cell:
errors.append(f"Row {row_idx}: Missing slot value")
continue
slot_num = int(slot_cell) if isinstance(slot_cell, (int, float)) else None
if slot_num is None:
try:
slot_num = int(str(slot_cell).strip())
except (ValueError, AttributeError):
errors.append(f"Row {row_idx}: Invalid slot value: {slot_cell}")
continue
# Get or infer level
if not level_cell:
# Use p-value from Row 4 to determine level
p_val = p_values.get(slot_num, 0.5)
level_val = categorize_difficulty(p_val)
else:
level_val = str(level_cell).strip().lower()
if level_val not in ["mudah", "sedang", "sulit"]:
errors.append(
f"Row {row_idx}: Invalid level '{level_cell}'. Must be 'mudah', 'sedang', or 'sulit'"
)
continue
# Validate soal_text
if not soal_text_cell:
errors.append(f"Row {row_idx} (slot {slot_num}): Missing soal_text (question stem)")
continue
# Build options JSON
options: Dict[str, str] = {}
if option_a:
options["A"] = str(option_a).strip()
if option_b:
options["B"] = str(option_b).strip()
if option_c:
options["C"] = str(option_c).strip()
if option_d:
options["D"] = str(option_d).strip()
if len(options) < 4:
errors.append(
f"Row {row_idx} (slot {slot_num}): Missing options. Expected 4 options (A, B, C, D)"
)
continue
# Get correct answer
if not correct_cell:
# Fall back to answer key from Row 2
correct_ans = answer_key.get(slot_num)
if not correct_ans:
errors.append(
f"Row {row_idx} (slot {slot_num}): Missing correct_answer and no answer key found"
)
continue
else:
correct_ans = str(correct_cell).strip().upper()
if correct_ans not in ["A", "B", "C", "D"]:
errors.append(
f"Row {row_idx} (slot {slot_num}): Invalid correct_answer '{correct_ans}'. Must be A, B, C, or D"
)
continue
# Get CTT parameters
p_val = p_values.get(slot_num, 0.5)
bobot_val = bobot_values.get(slot_num, 1.0 - p_val)
# Validate p-value range
if p_val < 0 or p_val > 1:
errors.append(
f"Slot {slot_num}: Invalid p-value {p_val}. Must be in range [0, 1]"
)
continue
# Validate bobot range
if bobot_val < 0 or bobot_val > 1:
errors.append(
f"Slot {slot_num}: Invalid bobot {bobot_val}. Must be in range [0, 1]"
)
continue
# Calculate CTT category and IRT b parameter
ctt_cat = categorize_difficulty(p_val)
irt_b = convert_ctt_p_to_irt_b(p_val)
# Build item dict
item = {
"tryout_id": tryout_id,
"website_id": website_id,
"slot": slot_num,
"level": level_val,
"stem": str(soal_text_cell).strip(),
"options": options,
"correct_answer": correct_ans,
"explanation": None,
"ctt_p": p_val,
"ctt_bobot": bobot_val,
"ctt_category": ctt_cat,
"irt_b": irt_b,
"irt_se": None,
"calibrated": False,
"calibration_sample_size": 0,
"generated_by": "manual",
"ai_model": None,
"basis_item_id": None,
}
items.append(item)
return {
"items": items,
"validation_errors": errors,
"items_count": len(items)
}
except Exception as e:
return {
"items": [],
"validation_errors": [f"Parsing error: {str(e)}"],
"items_count": 0
}
async def bulk_insert_items(
items_list: List[Dict[str, Any]],
db: AsyncSession
) -> Dict[str, Any]:
"""
Bulk insert items with duplicate detection.
Skips duplicates based on (tryout_id, website_id, slot).
Args:
items_list: List of item dictionaries to insert
db: Async SQLAlchemy database session
Returns:
Dict with:
- inserted_count: int - Number of items inserted
- duplicate_count: int - Number of duplicates skipped
- errors: List[str] - Any errors during insertion
"""
inserted_count = 0
duplicate_count = 0
errors: List[str] = []
try:
for item_data in items_list:
# Check for duplicate
result = await db.execute(
select(Item).where(
Item.tryout_id == item_data["tryout_id"],
Item.website_id == item_data["website_id"],
Item.slot == item_data["slot"]
)
)
existing = result.scalar_one_or_none()
if existing:
duplicate_count += 1
continue
# Create new item
item = Item(**item_data)
db.add(item)
inserted_count += 1
# Commit all inserts
await db.commit()
return {
"inserted_count": inserted_count,
"duplicate_count": duplicate_count,
"errors": errors
}
except Exception as e:
await db.rollback()
return {
"inserted_count": 0,
"duplicate_count": duplicate_count,
"errors": [f"Insertion failed: {str(e)}"]
}
async def export_questions_to_excel(
tryout_id: str,
website_id: int,
db: AsyncSession,
output_path: Optional[str] = None
) -> str:
"""
Export questions to Excel in standardized format.
Creates Excel workbook with:
- Sheet "CONTOH"
- Row 2: KUNCI (answer key)
- Row 4: TK (p-values)
- Row 5: BOBOT (weights)
- Rows 6+: Question data
Args:
tryout_id: Tryout identifier
website_id: Website identifier
db: Async SQLAlchemy database session
output_path: Optional output file path. If not provided, generates temp file.
Returns:
Path to exported Excel file
"""
# Fetch all items for this tryout
result = await db.execute(
select(Item).filter(
Item.tryout_id == tryout_id,
Item.website_id == website_id
).order_by(Item.slot)
)
items = result.scalars().all()
if not items:
raise ValueError(f"No items found for tryout_id={tryout_id}, website_id={website_id}")
# Create workbook
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "CONTOH"
# Determine max slot for column sizing
max_slot = max(item.slot for item in items)
# Row 1: Header
ws.cell(1, 1, "No")
ws.cell(1, 2, "Level")
ws.cell(1, 3, "Soal")
for slot_idx in range(max_slot):
col = slot_idx + 4
ws.cell(1, col, f"Soal {slot_idx + 1}")
# Row 2: KUNCI (answer key)
ws.cell(2, 1, "")
ws.cell(2, 2, "")
ws.cell(2, 3, "KUNCI")
for item in items:
col = item.slot + 3
ws.cell(2, col, item.correct_answer)
# Row 3: Empty
ws.cell(3, 1, "")
ws.cell(3, 2, "")
ws.cell(3, 3, "")
# Row 4: TK (p-values)
ws.cell(4, 1, "")
ws.cell(4, 2, "")
ws.cell(4, 3, "TK")
for item in items:
col = item.slot + 3
ws.cell(4, col, item.ctt_p or 0.5)
# Row 5: BOBOT (weights)
ws.cell(5, 1, "")
ws.cell(5, 2, "")
ws.cell(5, 3, "BOBOT")
for item in items:
col = item.slot + 3
ws.cell(5, col, item.ctt_bobot or (1.0 - (item.ctt_p or 0.5)))
# Rows 6+: Question data
row_idx = 6
for item in items:
# Column 1: Slot number
ws.cell(row_idx, 1, item.slot)
# Column 2: Level
ws.cell(row_idx, 2, item.level)
# Column 3: Soal text (stem)
ws.cell(row_idx, 3, item.stem)
# Columns 4+: Options
options = item.options or {}
ws.cell(row_idx, 4, options.get("A", ""))
ws.cell(row_idx, 5, options.get("B", ""))
ws.cell(row_idx, 6, options.get("C", ""))
ws.cell(row_idx, 7, options.get("D", ""))
# Column 8: Correct answer
ws.cell(row_idx, 8, item.correct_answer)
row_idx += 1
# Generate output path if not provided
if output_path is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"/tmp/tryout_{tryout_id}_export_{timestamp}.xlsx"
# Save workbook
wb.save(output_path)
return output_path