""" Importer for Sejoli tryout JSON snapshot payloads. This importer stores snapshots as read-only reference data. It does not create or overwrite operational items, because the exported JSON does not currently contain the full option text needed for the live item bank. """ from __future__ import annotations import hashlib import json from dataclasses import dataclass from datetime import datetime, timezone from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models import Item, TryoutImportSnapshot, TryoutSnapshotQuestion, Website SOURCE_FORMAT = "sejoli_json" DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" class TryoutImportError(ValueError): """Raised when the incoming payload is structurally invalid.""" @dataclass class QuestionDiffSummary: total_questions: int new_questions: int updated_questions: int unchanged_questions: int removed_questions: int missing_option_labels: int @dataclass class TryoutPreview: source_tryout_id: str source_key: str title: str permalink: str | None question_diff: QuestionDiffSummary warnings: list[str] def _parse_datetime(value: str | None) -> datetime | None: if not value: return None return datetime.strptime(value, DATETIME_FORMAT).replace(tzinfo=timezone.utc) def _sha256(value: Any) -> str: payload = json.dumps(value, sort_keys=True, ensure_ascii=False) return hashlib.sha256(payload.encode("utf-8")).hexdigest() def _validate_root(payload: dict[str, Any]) -> dict[str, Any]: if not isinstance(payload, dict): raise TryoutImportError("Payload must be a JSON object.") if "tryouts" not in payload or not isinstance(payload["tryouts"], dict) or not payload["tryouts"]: raise TryoutImportError("Payload must contain a non-empty 'tryouts' object.") return payload def _extract_tryout_previews(payload: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]: return list(payload["tryouts"].items()) def _normalize_question(question: dict[str, Any]) -> dict[str, Any]: raw_options = question.get("options") or [] has_option_labels = any( bool(((opt or {}).get("text") or (opt or {}).get("label") or "").strip()) for opt in raw_options if isinstance(opt, dict) ) normalized = { "source_question_id": str(question.get("id", "")), "title": str(question.get("title") or "").strip(), "question": str(question.get("question") or "").strip(), "explanation": str(question.get("explanation") or "").strip() or None, "correct_answer": str(question.get("answer") or "").strip().upper(), "category_id": question.get("category_id"), "category_name": str(question.get("category_name") or "").strip() or None, "category_code": str(question.get("category_code") or "").strip() or None, "raw_options": raw_options, "option_count": len(raw_options), "has_option_labels": has_option_labels, "raw_payload": question, } normalized["content_checksum"] = _sha256( { "title": normalized["title"], "question": normalized["question"], "explanation": normalized["explanation"], "correct_answer": normalized["correct_answer"], "category_id": normalized["category_id"], "category_name": normalized["category_name"], "category_code": normalized["category_code"], "raw_options": normalized["raw_options"], } ) return normalized async def ensure_website_exists(db: AsyncSession, website_id: int) -> Website: result = await db.execute(select(Website).where(Website.id == website_id)) website = result.scalar_one_or_none() if website is None: raise TryoutImportError( f"Website {website_id} not found. Register the website in the backend first; this is not configured via .env." ) return website async def preview_tryout_json_import(payload: dict[str, Any], website_id: int, db: AsyncSession) -> dict[str, Any]: _validate_root(payload) await ensure_website_exists(db, website_id) tryout_previews: list[TryoutPreview] = [] total_new = total_updated = total_unchanged = total_removed = total_missing_labels = 0 for source_key, tryout_payload in _extract_tryout_previews(payload): info = tryout_payload.get("info") or {} source_tryout_id = str(info.get("id") or source_key) title = str(info.get("title") or source_key) questions = tryout_payload.get("questions") or [] normalized_questions = [_normalize_question(q) for q in questions] existing_result = await db.execute( select(TryoutSnapshotQuestion).where( TryoutSnapshotQuestion.website_id == website_id, TryoutSnapshotQuestion.source_tryout_id == source_tryout_id, ) ) existing_questions = { row.source_question_id: row for row in existing_result.scalars().all() } new_questions = updated_questions = unchanged_questions = 0 missing_option_labels = 0 incoming_ids: set[str] = set() for question in normalized_questions: incoming_ids.add(question["source_question_id"]) existing = existing_questions.get(question["source_question_id"]) if question["has_option_labels"] is False: missing_option_labels += 1 if existing is None: new_questions += 1 elif existing.content_checksum != question["content_checksum"]: updated_questions += 1 else: unchanged_questions += 1 removed_questions = sum(1 for question_id, row in existing_questions.items() if row.is_active and question_id not in incoming_ids) warnings: list[str] = [] if missing_option_labels: warnings.append( f"{missing_option_labels} question(s) have no exported option text in the JSON; import will store raw reference data only." ) summary = QuestionDiffSummary( total_questions=len(normalized_questions), new_questions=new_questions, updated_questions=updated_questions, unchanged_questions=unchanged_questions, removed_questions=removed_questions, missing_option_labels=missing_option_labels, ) total_new += new_questions total_updated += updated_questions total_unchanged += unchanged_questions total_removed += removed_questions total_missing_labels += missing_option_labels tryout_previews.append( TryoutPreview( source_tryout_id=source_tryout_id, source_key=source_key, title=title, permalink=info.get("permalink"), question_diff=summary, warnings=warnings, ) ) return { "source_format": SOURCE_FORMAT, "tryout_count": len(tryout_previews), "totals": { "new_questions": total_new, "updated_questions": total_updated, "unchanged_questions": total_unchanged, "removed_questions": total_removed, "missing_option_labels": total_missing_labels, }, "tryouts": [ { "source_tryout_id": preview.source_tryout_id, "source_key": preview.source_key, "title": preview.title, "permalink": preview.permalink, "question_diff": preview.question_diff.__dict__, "warnings": preview.warnings, } for preview in tryout_previews ], } async def import_tryout_json_snapshot(payload: dict[str, Any], website_id: int, db: AsyncSession) -> dict[str, Any]: preview = await preview_tryout_json_import(payload, website_id, db) export_info = payload.get("export_info") or {} imported_tryouts: list[dict[str, Any]] = [] for source_key, tryout_payload in _extract_tryout_previews(payload): info = tryout_payload.get("info") or {} source_tryout_id = str(info.get("id") or source_key) title = str(info.get("title") or source_key) questions = tryout_payload.get("questions") or [] results = tryout_payload.get("results") or [] normalized_questions = [_normalize_question(q) for q in questions] snapshot = TryoutImportSnapshot( website_id=website_id, source_tryout_id=source_tryout_id, source_key=source_key, title=title, source_permalink=info.get("permalink"), source_status=info.get("status"), exported_at=_parse_datetime(export_info.get("exported_at")), source_created_at=_parse_datetime(info.get("created_date")), source_modified_at=_parse_datetime(info.get("modified_date")), exported_by=export_info.get("exported_by"), question_count=len(questions), result_count=len(results), payload_checksum=_sha256(tryout_payload), raw_payload=tryout_payload, ) db.add(snapshot) await db.flush() existing_result = await db.execute( select(TryoutSnapshotQuestion).where( TryoutSnapshotQuestion.website_id == website_id, TryoutSnapshotQuestion.source_tryout_id == source_tryout_id, ) ) existing_questions = { row.source_question_id: row for row in existing_result.scalars().all() } now = datetime.now(timezone.utc) incoming_ids: set[str] = set() new_questions = updated_questions = unchanged_questions = 0 for question in normalized_questions: source_question_id = question["source_question_id"] incoming_ids.add(source_question_id) existing = existing_questions.get(source_question_id) if existing is None: row = TryoutSnapshotQuestion( website_id=website_id, source_tryout_id=source_tryout_id, source_question_id=source_question_id, latest_snapshot_id=snapshot.id, question_title=question["title"] or question["question"], question_html=question["question"], explanation_html=question["explanation"], raw_options=question["raw_options"], correct_answer=question["correct_answer"], category_id=question["category_id"], category_name=question["category_name"], category_code=question["category_code"], option_count=question["option_count"], has_option_labels=question["has_option_labels"], is_active=True, content_checksum=question["content_checksum"], raw_payload=question["raw_payload"], last_seen_at=now, ) db.add(row) new_questions += 1 continue content_changed = existing.content_checksum != question["content_checksum"] if content_changed: existing.question_title = question["title"] or question["question"] existing.question_html = question["question"] existing.explanation_html = question["explanation"] existing.raw_options = question["raw_options"] existing.correct_answer = question["correct_answer"] existing.category_id = question["category_id"] existing.category_name = question["category_name"] existing.category_code = question["category_code"] existing.option_count = question["option_count"] existing.has_option_labels = question["has_option_labels"] existing.content_checksum = question["content_checksum"] existing.raw_payload = question["raw_payload"] updated_questions += 1 else: unchanged_questions += 1 existing.latest_snapshot_id = snapshot.id existing.is_active = True existing.last_seen_at = now # If source content changed, mark AI children derived from this source as stale. if content_changed: stale_variants_result = await db.execute( select(Item).where( Item.generated_by == "ai", Item.source_snapshot_question_id == existing.id, Item.variant_status.in_(["draft", "approved", "active"]), ) ) for variant in stale_variants_result.scalars().all(): variant.variant_status = "stale" removed_questions = 0 for source_question_id, existing in existing_questions.items(): if existing.is_active and source_question_id not in incoming_ids: existing.is_active = False existing.latest_snapshot_id = snapshot.id existing.last_seen_at = now removed_questions += 1 stale_removed_result = await db.execute( select(Item).where( Item.generated_by == "ai", Item.source_snapshot_question_id == existing.id, Item.variant_status.in_(["draft", "approved", "active"]), ) ) for variant in stale_removed_result.scalars().all(): variant.variant_status = "stale" imported_tryouts.append( { "snapshot_id": snapshot.id, "source_tryout_id": source_tryout_id, "title": title, "new_questions": new_questions, "updated_questions": updated_questions, "unchanged_questions": unchanged_questions, "removed_questions": removed_questions, "question_count": len(normalized_questions), } ) await db.flush() return { "source_format": SOURCE_FORMAT, "website_id": website_id, "preview": preview, "imported_tryouts": imported_tryouts, "message": "Tryout JSON snapshot imported as read-only reference data.", }