346 lines
13 KiB
Python
346 lines
13 KiB
Python
"""
|
|
Importer for Sejoli tryout JSON snapshot payloads.
|
|
|
|
This importer stores snapshots as read-only reference data. It does not create
|
|
or overwrite operational items, because the exported JSON does not currently
|
|
contain the full option text needed for the live item bank.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models import TryoutImportSnapshot, TryoutSnapshotQuestion, Website
|
|
|
|
SOURCE_FORMAT = "sejoli_json"
|
|
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
|
|
|
|
|
|
class TryoutImportError(ValueError):
|
|
"""Raised when the incoming payload is structurally invalid."""
|
|
|
|
|
|
@dataclass
|
|
class QuestionDiffSummary:
|
|
total_questions: int
|
|
new_questions: int
|
|
updated_questions: int
|
|
unchanged_questions: int
|
|
removed_questions: int
|
|
missing_option_labels: int
|
|
|
|
|
|
@dataclass
|
|
class TryoutPreview:
|
|
source_tryout_id: str
|
|
source_key: str
|
|
title: str
|
|
permalink: str | None
|
|
question_diff: QuestionDiffSummary
|
|
warnings: list[str]
|
|
|
|
|
|
def _parse_datetime(value: str | None) -> datetime | None:
|
|
if not value:
|
|
return None
|
|
return datetime.strptime(value, DATETIME_FORMAT).replace(tzinfo=timezone.utc)
|
|
|
|
|
|
def _sha256(value: Any) -> str:
|
|
payload = json.dumps(value, sort_keys=True, ensure_ascii=False)
|
|
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _validate_root(payload: dict[str, Any]) -> dict[str, Any]:
|
|
if not isinstance(payload, dict):
|
|
raise TryoutImportError("Payload must be a JSON object.")
|
|
if "tryouts" not in payload or not isinstance(payload["tryouts"], dict) or not payload["tryouts"]:
|
|
raise TryoutImportError("Payload must contain a non-empty 'tryouts' object.")
|
|
return payload
|
|
|
|
|
|
def _extract_tryout_previews(payload: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]:
|
|
return list(payload["tryouts"].items())
|
|
|
|
|
|
def _normalize_question(question: dict[str, Any]) -> dict[str, Any]:
|
|
raw_options = question.get("options") or []
|
|
has_option_labels = any(
|
|
bool(((opt or {}).get("text") or (opt or {}).get("label") or "").strip())
|
|
for opt in raw_options
|
|
if isinstance(opt, dict)
|
|
)
|
|
normalized = {
|
|
"source_question_id": str(question.get("id", "")),
|
|
"title": str(question.get("title") or "").strip(),
|
|
"question": str(question.get("question") or "").strip(),
|
|
"explanation": str(question.get("explanation") or "").strip() or None,
|
|
"correct_answer": str(question.get("answer") or "").strip().upper(),
|
|
"category_id": question.get("category_id"),
|
|
"category_name": str(question.get("category_name") or "").strip() or None,
|
|
"category_code": str(question.get("category_code") or "").strip() or None,
|
|
"raw_options": raw_options,
|
|
"option_count": len(raw_options),
|
|
"has_option_labels": has_option_labels,
|
|
"raw_payload": question,
|
|
}
|
|
normalized["content_checksum"] = _sha256(
|
|
{
|
|
"title": normalized["title"],
|
|
"question": normalized["question"],
|
|
"explanation": normalized["explanation"],
|
|
"correct_answer": normalized["correct_answer"],
|
|
"category_id": normalized["category_id"],
|
|
"category_name": normalized["category_name"],
|
|
"category_code": normalized["category_code"],
|
|
"raw_options": normalized["raw_options"],
|
|
}
|
|
)
|
|
return normalized
|
|
|
|
|
|
async def ensure_website_exists(db: AsyncSession, website_id: int) -> Website:
|
|
result = await db.execute(select(Website).where(Website.id == website_id))
|
|
website = result.scalar_one_or_none()
|
|
if website is None:
|
|
raise TryoutImportError(
|
|
f"Website {website_id} not found. Register the website in the backend first; this is not configured via .env."
|
|
)
|
|
return website
|
|
|
|
|
|
async def preview_tryout_json_import(payload: dict[str, Any], website_id: int, db: AsyncSession) -> dict[str, Any]:
|
|
_validate_root(payload)
|
|
await ensure_website_exists(db, website_id)
|
|
|
|
tryout_previews: list[TryoutPreview] = []
|
|
total_new = total_updated = total_unchanged = total_removed = total_missing_labels = 0
|
|
|
|
for source_key, tryout_payload in _extract_tryout_previews(payload):
|
|
info = tryout_payload.get("info") or {}
|
|
source_tryout_id = str(info.get("id") or source_key)
|
|
title = str(info.get("title") or source_key)
|
|
questions = tryout_payload.get("questions") or []
|
|
normalized_questions = [_normalize_question(q) for q in questions]
|
|
|
|
existing_result = await db.execute(
|
|
select(TryoutSnapshotQuestion).where(
|
|
TryoutSnapshotQuestion.website_id == website_id,
|
|
TryoutSnapshotQuestion.source_tryout_id == source_tryout_id,
|
|
)
|
|
)
|
|
existing_questions = {
|
|
row.source_question_id: row
|
|
for row in existing_result.scalars().all()
|
|
}
|
|
|
|
new_questions = updated_questions = unchanged_questions = 0
|
|
missing_option_labels = 0
|
|
incoming_ids: set[str] = set()
|
|
|
|
for question in normalized_questions:
|
|
incoming_ids.add(question["source_question_id"])
|
|
existing = existing_questions.get(question["source_question_id"])
|
|
if question["has_option_labels"] is False:
|
|
missing_option_labels += 1
|
|
if existing is None:
|
|
new_questions += 1
|
|
elif existing.content_checksum != question["content_checksum"]:
|
|
updated_questions += 1
|
|
else:
|
|
unchanged_questions += 1
|
|
|
|
removed_questions = sum(1 for question_id, row in existing_questions.items() if row.is_active and question_id not in incoming_ids)
|
|
|
|
warnings: list[str] = []
|
|
if missing_option_labels:
|
|
warnings.append(
|
|
f"{missing_option_labels} question(s) have no exported option text in the JSON; import will store raw reference data only."
|
|
)
|
|
|
|
summary = QuestionDiffSummary(
|
|
total_questions=len(normalized_questions),
|
|
new_questions=new_questions,
|
|
updated_questions=updated_questions,
|
|
unchanged_questions=unchanged_questions,
|
|
removed_questions=removed_questions,
|
|
missing_option_labels=missing_option_labels,
|
|
)
|
|
|
|
total_new += new_questions
|
|
total_updated += updated_questions
|
|
total_unchanged += unchanged_questions
|
|
total_removed += removed_questions
|
|
total_missing_labels += missing_option_labels
|
|
|
|
tryout_previews.append(
|
|
TryoutPreview(
|
|
source_tryout_id=source_tryout_id,
|
|
source_key=source_key,
|
|
title=title,
|
|
permalink=info.get("permalink"),
|
|
question_diff=summary,
|
|
warnings=warnings,
|
|
)
|
|
)
|
|
|
|
return {
|
|
"source_format": SOURCE_FORMAT,
|
|
"tryout_count": len(tryout_previews),
|
|
"totals": {
|
|
"new_questions": total_new,
|
|
"updated_questions": total_updated,
|
|
"unchanged_questions": total_unchanged,
|
|
"removed_questions": total_removed,
|
|
"missing_option_labels": total_missing_labels,
|
|
},
|
|
"tryouts": [
|
|
{
|
|
"source_tryout_id": preview.source_tryout_id,
|
|
"source_key": preview.source_key,
|
|
"title": preview.title,
|
|
"permalink": preview.permalink,
|
|
"question_diff": preview.question_diff.__dict__,
|
|
"warnings": preview.warnings,
|
|
}
|
|
for preview in tryout_previews
|
|
],
|
|
}
|
|
|
|
|
|
async def import_tryout_json_snapshot(payload: dict[str, Any], website_id: int, db: AsyncSession) -> dict[str, Any]:
|
|
preview = await preview_tryout_json_import(payload, website_id, db)
|
|
export_info = payload.get("export_info") or {}
|
|
|
|
imported_tryouts: list[dict[str, Any]] = []
|
|
|
|
for source_key, tryout_payload in _extract_tryout_previews(payload):
|
|
info = tryout_payload.get("info") or {}
|
|
source_tryout_id = str(info.get("id") or source_key)
|
|
title = str(info.get("title") or source_key)
|
|
questions = tryout_payload.get("questions") or []
|
|
results = tryout_payload.get("results") or []
|
|
normalized_questions = [_normalize_question(q) for q in questions]
|
|
|
|
snapshot = TryoutImportSnapshot(
|
|
website_id=website_id,
|
|
source_tryout_id=source_tryout_id,
|
|
source_key=source_key,
|
|
title=title,
|
|
source_permalink=info.get("permalink"),
|
|
source_status=info.get("status"),
|
|
exported_at=_parse_datetime(export_info.get("exported_at")),
|
|
source_created_at=_parse_datetime(info.get("created_date")),
|
|
source_modified_at=_parse_datetime(info.get("modified_date")),
|
|
exported_by=export_info.get("exported_by"),
|
|
question_count=len(questions),
|
|
result_count=len(results),
|
|
payload_checksum=_sha256(tryout_payload),
|
|
raw_payload=tryout_payload,
|
|
)
|
|
db.add(snapshot)
|
|
await db.flush()
|
|
|
|
existing_result = await db.execute(
|
|
select(TryoutSnapshotQuestion).where(
|
|
TryoutSnapshotQuestion.website_id == website_id,
|
|
TryoutSnapshotQuestion.source_tryout_id == source_tryout_id,
|
|
)
|
|
)
|
|
existing_questions = {
|
|
row.source_question_id: row
|
|
for row in existing_result.scalars().all()
|
|
}
|
|
|
|
now = datetime.now(timezone.utc)
|
|
incoming_ids: set[str] = set()
|
|
new_questions = updated_questions = unchanged_questions = 0
|
|
|
|
for question in normalized_questions:
|
|
source_question_id = question["source_question_id"]
|
|
incoming_ids.add(source_question_id)
|
|
existing = existing_questions.get(source_question_id)
|
|
if existing is None:
|
|
row = TryoutSnapshotQuestion(
|
|
website_id=website_id,
|
|
source_tryout_id=source_tryout_id,
|
|
source_question_id=source_question_id,
|
|
latest_snapshot_id=snapshot.id,
|
|
question_title=question["title"] or question["question"],
|
|
question_html=question["question"],
|
|
explanation_html=question["explanation"],
|
|
raw_options=question["raw_options"],
|
|
correct_answer=question["correct_answer"],
|
|
category_id=question["category_id"],
|
|
category_name=question["category_name"],
|
|
category_code=question["category_code"],
|
|
option_count=question["option_count"],
|
|
has_option_labels=question["has_option_labels"],
|
|
is_active=True,
|
|
content_checksum=question["content_checksum"],
|
|
raw_payload=question["raw_payload"],
|
|
last_seen_at=now,
|
|
)
|
|
db.add(row)
|
|
new_questions += 1
|
|
continue
|
|
|
|
if existing.content_checksum != question["content_checksum"]:
|
|
existing.question_title = question["title"] or question["question"]
|
|
existing.question_html = question["question"]
|
|
existing.explanation_html = question["explanation"]
|
|
existing.raw_options = question["raw_options"]
|
|
existing.correct_answer = question["correct_answer"]
|
|
existing.category_id = question["category_id"]
|
|
existing.category_name = question["category_name"]
|
|
existing.category_code = question["category_code"]
|
|
existing.option_count = question["option_count"]
|
|
existing.has_option_labels = question["has_option_labels"]
|
|
existing.content_checksum = question["content_checksum"]
|
|
existing.raw_payload = question["raw_payload"]
|
|
updated_questions += 1
|
|
else:
|
|
unchanged_questions += 1
|
|
|
|
existing.latest_snapshot_id = snapshot.id
|
|
existing.is_active = True
|
|
existing.last_seen_at = now
|
|
|
|
removed_questions = 0
|
|
for source_question_id, existing in existing_questions.items():
|
|
if existing.is_active and source_question_id not in incoming_ids:
|
|
existing.is_active = False
|
|
existing.latest_snapshot_id = snapshot.id
|
|
existing.last_seen_at = now
|
|
removed_questions += 1
|
|
|
|
imported_tryouts.append(
|
|
{
|
|
"snapshot_id": snapshot.id,
|
|
"source_tryout_id": source_tryout_id,
|
|
"title": title,
|
|
"new_questions": new_questions,
|
|
"updated_questions": updated_questions,
|
|
"unchanged_questions": unchanged_questions,
|
|
"removed_questions": removed_questions,
|
|
"question_count": len(normalized_questions),
|
|
}
|
|
)
|
|
|
|
await db.flush()
|
|
|
|
return {
|
|
"source_format": SOURCE_FORMAT,
|
|
"website_id": website_id,
|
|
"preview": preview,
|
|
"imported_tryouts": imported_tryouts,
|
|
"message": "Tryout JSON snapshot imported as read-only reference data.",
|
|
}
|