Add Sejoli tryout JSON snapshot importer

This commit is contained in:
dwindown
2026-04-02 17:04:01 +07:00
parent 51c577be05
commit b4ebdc9c4f
7 changed files with 910 additions and 1 deletions

View File

@@ -0,0 +1,118 @@
"""add tryout JSON snapshot tables
Revision ID: 20260402_000002
Revises: 20260331_000001
Create Date: 2026-04-02 11:30:00
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "20260402_000002"
down_revision: Union[str, None] = "20260331_000001"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"tryout_import_snapshots",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("website_id", sa.Integer(), nullable=False),
sa.Column("source_tryout_id", sa.String(length=255), nullable=False),
sa.Column("source_key", sa.String(length=255), nullable=False),
sa.Column("title", sa.String(length=255), nullable=False),
sa.Column("source_permalink", sa.String(length=1024), nullable=True),
sa.Column("source_status", sa.String(length=50), nullable=True),
sa.Column("exported_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("source_created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("source_modified_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("exported_by", sa.String(length=255), nullable=True),
sa.Column("question_count", sa.Integer(), nullable=False),
sa.Column("result_count", sa.Integer(), nullable=False),
sa.Column("payload_checksum", sa.String(length=64), nullable=False),
sa.Column("raw_payload", sa.JSON(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(["website_id"], ["websites.id"], ondelete="CASCADE", onupdate="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
"ix_tryout_import_snapshots_website_id",
"tryout_import_snapshots",
["website_id"],
unique=False,
)
op.create_index(
"ix_tryout_import_snapshots_source_tryout_id",
"tryout_import_snapshots",
["source_tryout_id"],
unique=False,
)
op.create_table(
"tryout_snapshot_questions",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("website_id", sa.Integer(), nullable=False),
sa.Column("source_tryout_id", sa.String(length=255), nullable=False),
sa.Column("source_question_id", sa.String(length=255), nullable=False),
sa.Column("latest_snapshot_id", sa.Integer(), nullable=True),
sa.Column("question_title", sa.Text(), nullable=False),
sa.Column("question_html", sa.Text(), nullable=False),
sa.Column("explanation_html", sa.Text(), nullable=True),
sa.Column("raw_options", sa.JSON(), nullable=False),
sa.Column("correct_answer", sa.String(length=10), nullable=False),
sa.Column("category_id", sa.Integer(), nullable=True),
sa.Column("category_name", sa.String(length=255), nullable=True),
sa.Column("category_code", sa.String(length=255), nullable=True),
sa.Column("option_count", sa.Integer(), nullable=False),
sa.Column("has_option_labels", sa.Boolean(), nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=False),
sa.Column("content_checksum", sa.String(length=64), nullable=False),
sa.Column("raw_payload", sa.JSON(), nullable=False),
sa.Column("first_seen_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("last_seen_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(["website_id"], ["websites.id"], ondelete="CASCADE", onupdate="CASCADE"),
sa.ForeignKeyConstraint(["latest_snapshot_id"], ["tryout_import_snapshots.id"], ondelete="SET NULL", onupdate="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"website_id",
"source_tryout_id",
"source_question_id",
name="uq_snapshot_questions_website_tryout_question",
),
)
op.create_index(
"ix_tryout_snapshot_questions_website_id",
"tryout_snapshot_questions",
["website_id"],
unique=False,
)
op.create_index(
"ix_tryout_snapshot_questions_source_tryout_id",
"tryout_snapshot_questions",
["source_tryout_id"],
unique=False,
)
op.create_index(
"ix_tryout_snapshot_questions_latest_snapshot_id",
"tryout_snapshot_questions",
["latest_snapshot_id"],
unique=False,
)
def downgrade() -> None:
op.drop_index("ix_tryout_snapshot_questions_latest_snapshot_id", table_name="tryout_snapshot_questions")
op.drop_index("ix_tryout_snapshot_questions_source_tryout_id", table_name="tryout_snapshot_questions")
op.drop_index("ix_tryout_snapshot_questions_website_id", table_name="tryout_snapshot_questions")
op.drop_table("tryout_snapshot_questions")
op.drop_index("ix_tryout_import_snapshots_source_tryout_id", table_name="tryout_import_snapshots")
op.drop_index("ix_tryout_import_snapshots_website_id", table_name="tryout_import_snapshots")
op.drop_table("tryout_import_snapshots")

View File

@@ -8,6 +8,8 @@ from app.database import Base
from app.models.item import Item from app.models.item import Item
from app.models.session import Session from app.models.session import Session
from app.models.tryout import Tryout from app.models.tryout import Tryout
from app.models.tryout_import_snapshot import TryoutImportSnapshot
from app.models.tryout_snapshot_question import TryoutSnapshotQuestion
from app.models.tryout_stats import TryoutStats from app.models.tryout_stats import TryoutStats
from app.models.user import User from app.models.user import User
from app.models.user_answer import UserAnswer from app.models.user_answer import UserAnswer
@@ -18,6 +20,8 @@ __all__ = [
"User", "User",
"Website", "Website",
"Tryout", "Tryout",
"TryoutImportSnapshot",
"TryoutSnapshotQuestion",
"Item", "Item",
"Session", "Session",
"UserAnswer", "UserAnswer",

View File

@@ -0,0 +1,103 @@
"""
Snapshot archive for imported external tryout payloads.
Stores each imported JSON export so the backend can trace source changes
without treating the source file itself as the system of record.
"""
from datetime import datetime
from typing import Optional
from sqlalchemy import DateTime, ForeignKey, Integer, JSON, String, func
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class TryoutImportSnapshot(Base):
__tablename__ = "tryout_import_snapshots"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
website_id: Mapped[int] = mapped_column(
ForeignKey("websites.id", ondelete="CASCADE", onupdate="CASCADE"),
nullable=False,
index=True,
comment="Website identifier",
)
source_tryout_id: Mapped[str] = mapped_column(
String(255),
nullable=False,
index=True,
comment="External source tryout identifier",
)
source_key: Mapped[str] = mapped_column(
String(255),
nullable=False,
comment="External tryout object key in source payload",
)
title: Mapped[str] = mapped_column(
String(255),
nullable=False,
comment="Imported tryout title",
)
source_permalink: Mapped[Optional[str]] = mapped_column(
String(1024),
nullable=True,
comment="Imported source permalink",
)
source_status: Mapped[Optional[str]] = mapped_column(
String(50),
nullable=True,
comment="Imported source status",
)
exported_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),
nullable=True,
comment="Timestamp from source export metadata",
)
source_created_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),
nullable=True,
comment="Source tryout created timestamp",
)
source_modified_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),
nullable=True,
comment="Source tryout modified timestamp",
)
exported_by: Mapped[Optional[str]] = mapped_column(
String(255),
nullable=True,
comment="Source exporter identity",
)
question_count: Mapped[int] = mapped_column(
Integer,
nullable=False,
default=0,
comment="Number of questions in imported payload",
)
result_count: Mapped[int] = mapped_column(
Integer,
nullable=False,
default=0,
comment="Number of result rows in imported payload",
)
payload_checksum: Mapped[str] = mapped_column(
String(64),
nullable=False,
comment="Checksum for the imported payload",
)
raw_payload: Mapped[dict] = mapped_column(
JSON,
nullable=False,
comment="Original imported payload",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
onupdate=func.now(),
)

View File

@@ -0,0 +1,139 @@
"""
Read-only normalized reference rows for imported tryout questions.
These rows reflect the latest imported source version of each question and are
kept separate from operational items and AI-generated variants.
"""
from datetime import datetime
from typing import Optional
from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, JSON, String, Text, UniqueConstraint, func
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class TryoutSnapshotQuestion(Base):
__tablename__ = "tryout_snapshot_questions"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
website_id: Mapped[int] = mapped_column(
ForeignKey("websites.id", ondelete="CASCADE", onupdate="CASCADE"),
nullable=False,
index=True,
comment="Website identifier",
)
source_tryout_id: Mapped[str] = mapped_column(
String(255),
nullable=False,
index=True,
comment="External source tryout identifier",
)
source_question_id: Mapped[str] = mapped_column(
String(255),
nullable=False,
comment="External source question identifier",
)
latest_snapshot_id: Mapped[Optional[int]] = mapped_column(
ForeignKey("tryout_import_snapshots.id", ondelete="SET NULL", onupdate="CASCADE"),
nullable=True,
index=True,
comment="Latest snapshot containing this question",
)
question_title: Mapped[str] = mapped_column(
Text,
nullable=False,
comment="Imported title or short label",
)
question_html: Mapped[str] = mapped_column(
Text,
nullable=False,
comment="Imported question body HTML",
)
explanation_html: Mapped[Optional[str]] = mapped_column(
Text,
nullable=True,
comment="Imported explanation HTML",
)
raw_options: Mapped[list] = mapped_column(
JSON,
nullable=False,
comment="Raw source options payload",
)
correct_answer: Mapped[str] = mapped_column(
String(10),
nullable=False,
comment="Imported correct answer key",
)
category_id: Mapped[Optional[int]] = mapped_column(
Integer,
nullable=True,
comment="Imported category id",
)
category_name: Mapped[Optional[str]] = mapped_column(
String(255),
nullable=True,
comment="Imported category name",
)
category_code: Mapped[Optional[str]] = mapped_column(
String(255),
nullable=True,
comment="Imported category code",
)
option_count: Mapped[int] = mapped_column(
Integer,
nullable=False,
default=0,
comment="Count of source options",
)
has_option_labels: Mapped[bool] = mapped_column(
Boolean,
nullable=False,
default=False,
comment="Whether source options include visible labels",
)
is_active: Mapped[bool] = mapped_column(
Boolean,
nullable=False,
default=True,
comment="Whether question is still present in latest source import",
)
content_checksum: Mapped[str] = mapped_column(
String(64),
nullable=False,
comment="Checksum of normalized question content",
)
raw_payload: Mapped[dict] = mapped_column(
JSON,
nullable=False,
comment="Original source question payload",
)
first_seen_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
last_seen_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
onupdate=func.now(),
)
__table_args__ = (
UniqueConstraint(
"website_id",
"source_tryout_id",
"source_question_id",
name="uq_snapshot_questions_website_tryout_question",
),
)

View File

@@ -1,14 +1,17 @@
""" """
Import/Export API router for Excel question migration. Import/Export API router for migration and snapshot ingestion.
Endpoints: Endpoints:
- POST /api/v1/import/preview: Preview Excel import without saving - POST /api/v1/import/preview: Preview Excel import without saving
- POST /api/v1/import/questions: Import questions from Excel to database - POST /api/v1/import/questions: Import questions from Excel to database
- GET /api/v1/export/questions: Export questions to Excel file - GET /api/v1/export/questions: Export questions to Excel file
- POST /api/v1/import-export/tryout-json/preview: Preview Sejoli tryout JSON import
- POST /api/v1/import-export/tryout-json: Import Sejoli tryout JSON as read-only snapshot
""" """
import os import os
import tempfile import tempfile
import json
from typing import Optional from typing import Optional
from fastapi import APIRouter, Depends, File, Form, Header, HTTPException, UploadFile, status from fastapi import APIRouter, Depends, File, Form, Header, HTTPException, UploadFile, status
@@ -16,12 +19,18 @@ from fastapi.responses import FileResponse
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db from app.database import get_db
from app.models import Website
from app.services.excel_import import ( from app.services.excel_import import (
bulk_insert_items, bulk_insert_items,
export_questions_to_excel, export_questions_to_excel,
parse_excel_import, parse_excel_import,
validate_excel_structure, validate_excel_structure,
) )
from app.services.tryout_json_import import (
TryoutImportError,
import_tryout_json_snapshot,
preview_tryout_json_import,
)
router = APIRouter(prefix="/api/v1/import-export", tags=["import-export"]) router = APIRouter(prefix="/api/v1/import-export", tags=["import-export"])
@@ -55,6 +64,21 @@ def get_website_id_from_header(
) )
async def ensure_website_exists(
website_id: int,
db: AsyncSession,
) -> None:
website = await db.get(Website, website_id)
if website is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=(
f"Website {website_id} not found. Website registration is stored in the database, "
"not in .env."
),
)
@router.post( @router.post(
"/preview", "/preview",
summary="Preview Excel import", summary="Preview Excel import",
@@ -322,3 +346,73 @@ async def export_questions(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Export failed: {str(e)}", detail=f"Export failed: {str(e)}",
) )
@router.post(
"/tryout-json/preview",
summary="Preview Sejoli tryout JSON import",
description="Parse a Sejoli tryout export JSON file and show snapshot diff without writing to database.",
)
async def preview_tryout_json(
file: UploadFile = File(..., description="Sejoli tryout export JSON"),
website_id: int = Depends(get_website_id_from_header),
db: AsyncSession = Depends(get_db),
) -> dict:
if not file.filename or not file.filename.lower().endswith(".json"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="File must be .json format",
)
await ensure_website_exists(website_id, db)
try:
payload = json.loads((await file.read()).decode("utf-8"))
except json.JSONDecodeError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid JSON file: {str(e)}",
)
try:
return await preview_tryout_json_import(payload, website_id, db)
except TryoutImportError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
@router.post(
"/tryout-json",
summary="Import Sejoli tryout JSON snapshot",
description="Store Sejoli tryout export JSON as read-only snapshot data and upsert normalized reference questions.",
)
async def import_tryout_json(
file: UploadFile = File(..., description="Sejoli tryout export JSON"),
website_id: int = Depends(get_website_id_from_header),
db: AsyncSession = Depends(get_db),
) -> dict:
if not file.filename or not file.filename.lower().endswith(".json"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="File must be .json format",
)
await ensure_website_exists(website_id, db)
try:
payload = json.loads((await file.read()).decode("utf-8"))
except json.JSONDecodeError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid JSON file: {str(e)}",
)
try:
return await import_tryout_json_snapshot(payload, website_id, db)
except TryoutImportError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)

View File

@@ -0,0 +1,341 @@
"""
Importer for Sejoli tryout JSON snapshot payloads.
This importer stores snapshots as read-only reference data. It does not create
or overwrite operational items, because the exported JSON does not currently
contain the full option text needed for the live item bank.
"""
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import TryoutImportSnapshot, TryoutSnapshotQuestion, Website
SOURCE_FORMAT = "sejoli_json"
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
class TryoutImportError(ValueError):
"""Raised when the incoming payload is structurally invalid."""
@dataclass
class QuestionDiffSummary:
total_questions: int
new_questions: int
updated_questions: int
unchanged_questions: int
removed_questions: int
missing_option_labels: int
@dataclass
class TryoutPreview:
source_tryout_id: str
source_key: str
title: str
permalink: str | None
question_diff: QuestionDiffSummary
warnings: list[str]
def _parse_datetime(value: str | None) -> datetime | None:
if not value:
return None
return datetime.strptime(value, DATETIME_FORMAT).replace(tzinfo=timezone.utc)
def _sha256(value: Any) -> str:
payload = json.dumps(value, sort_keys=True, ensure_ascii=False)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _validate_root(payload: dict[str, Any]) -> dict[str, Any]:
if not isinstance(payload, dict):
raise TryoutImportError("Payload must be a JSON object.")
if "tryouts" not in payload or not isinstance(payload["tryouts"], dict) or not payload["tryouts"]:
raise TryoutImportError("Payload must contain a non-empty 'tryouts' object.")
return payload
def _extract_tryout_previews(payload: dict[str, Any]) -> list[tuple[str, dict[str, Any]]]:
return list(payload["tryouts"].items())
def _normalize_question(question: dict[str, Any]) -> dict[str, Any]:
raw_options = question.get("options") or []
has_option_labels = any(bool((opt or {}).get("label")) for opt in raw_options if isinstance(opt, dict))
normalized = {
"source_question_id": str(question.get("id", "")),
"title": str(question.get("title") or "").strip(),
"question": str(question.get("question") or "").strip(),
"explanation": str(question.get("explanation") or "").strip() or None,
"correct_answer": str(question.get("answer") or "").strip().upper(),
"category_id": question.get("category_id"),
"category_name": str(question.get("category_name") or "").strip() or None,
"category_code": str(question.get("category_code") or "").strip() or None,
"raw_options": raw_options,
"option_count": len(raw_options),
"has_option_labels": has_option_labels,
"raw_payload": question,
}
normalized["content_checksum"] = _sha256(
{
"title": normalized["title"],
"question": normalized["question"],
"explanation": normalized["explanation"],
"correct_answer": normalized["correct_answer"],
"category_id": normalized["category_id"],
"category_name": normalized["category_name"],
"category_code": normalized["category_code"],
"raw_options": normalized["raw_options"],
}
)
return normalized
async def ensure_website_exists(db: AsyncSession, website_id: int) -> Website:
result = await db.execute(select(Website).where(Website.id == website_id))
website = result.scalar_one_or_none()
if website is None:
raise TryoutImportError(
f"Website {website_id} not found. Register the website in the backend first; this is not configured via .env."
)
return website
async def preview_tryout_json_import(payload: dict[str, Any], website_id: int, db: AsyncSession) -> dict[str, Any]:
_validate_root(payload)
await ensure_website_exists(db, website_id)
tryout_previews: list[TryoutPreview] = []
total_new = total_updated = total_unchanged = total_removed = total_missing_labels = 0
for source_key, tryout_payload in _extract_tryout_previews(payload):
info = tryout_payload.get("info") or {}
source_tryout_id = str(info.get("id") or source_key)
title = str(info.get("title") or source_key)
questions = tryout_payload.get("questions") or []
normalized_questions = [_normalize_question(q) for q in questions]
existing_result = await db.execute(
select(TryoutSnapshotQuestion).where(
TryoutSnapshotQuestion.website_id == website_id,
TryoutSnapshotQuestion.source_tryout_id == source_tryout_id,
)
)
existing_questions = {
row.source_question_id: row
for row in existing_result.scalars().all()
}
new_questions = updated_questions = unchanged_questions = 0
missing_option_labels = 0
incoming_ids: set[str] = set()
for question in normalized_questions:
incoming_ids.add(question["source_question_id"])
existing = existing_questions.get(question["source_question_id"])
if question["has_option_labels"] is False:
missing_option_labels += 1
if existing is None:
new_questions += 1
elif existing.content_checksum != question["content_checksum"]:
updated_questions += 1
else:
unchanged_questions += 1
removed_questions = sum(1 for question_id, row in existing_questions.items() if row.is_active and question_id not in incoming_ids)
warnings: list[str] = []
if missing_option_labels:
warnings.append(
f"{missing_option_labels} question(s) have no visible option labels in the export; import will store raw reference data only."
)
summary = QuestionDiffSummary(
total_questions=len(normalized_questions),
new_questions=new_questions,
updated_questions=updated_questions,
unchanged_questions=unchanged_questions,
removed_questions=removed_questions,
missing_option_labels=missing_option_labels,
)
total_new += new_questions
total_updated += updated_questions
total_unchanged += unchanged_questions
total_removed += removed_questions
total_missing_labels += missing_option_labels
tryout_previews.append(
TryoutPreview(
source_tryout_id=source_tryout_id,
source_key=source_key,
title=title,
permalink=info.get("permalink"),
question_diff=summary,
warnings=warnings,
)
)
return {
"source_format": SOURCE_FORMAT,
"tryout_count": len(tryout_previews),
"totals": {
"new_questions": total_new,
"updated_questions": total_updated,
"unchanged_questions": total_unchanged,
"removed_questions": total_removed,
"missing_option_labels": total_missing_labels,
},
"tryouts": [
{
"source_tryout_id": preview.source_tryout_id,
"source_key": preview.source_key,
"title": preview.title,
"permalink": preview.permalink,
"question_diff": preview.question_diff.__dict__,
"warnings": preview.warnings,
}
for preview in tryout_previews
],
}
async def import_tryout_json_snapshot(payload: dict[str, Any], website_id: int, db: AsyncSession) -> dict[str, Any]:
preview = await preview_tryout_json_import(payload, website_id, db)
export_info = payload.get("export_info") or {}
imported_tryouts: list[dict[str, Any]] = []
for source_key, tryout_payload in _extract_tryout_previews(payload):
info = tryout_payload.get("info") or {}
source_tryout_id = str(info.get("id") or source_key)
title = str(info.get("title") or source_key)
questions = tryout_payload.get("questions") or []
results = tryout_payload.get("results") or []
normalized_questions = [_normalize_question(q) for q in questions]
snapshot = TryoutImportSnapshot(
website_id=website_id,
source_tryout_id=source_tryout_id,
source_key=source_key,
title=title,
source_permalink=info.get("permalink"),
source_status=info.get("status"),
exported_at=_parse_datetime(export_info.get("exported_at")),
source_created_at=_parse_datetime(info.get("created_date")),
source_modified_at=_parse_datetime(info.get("modified_date")),
exported_by=export_info.get("exported_by"),
question_count=len(questions),
result_count=len(results),
payload_checksum=_sha256(tryout_payload),
raw_payload=tryout_payload,
)
db.add(snapshot)
await db.flush()
existing_result = await db.execute(
select(TryoutSnapshotQuestion).where(
TryoutSnapshotQuestion.website_id == website_id,
TryoutSnapshotQuestion.source_tryout_id == source_tryout_id,
)
)
existing_questions = {
row.source_question_id: row
for row in existing_result.scalars().all()
}
now = datetime.now(timezone.utc)
incoming_ids: set[str] = set()
new_questions = updated_questions = unchanged_questions = 0
for question in normalized_questions:
source_question_id = question["source_question_id"]
incoming_ids.add(source_question_id)
existing = existing_questions.get(source_question_id)
if existing is None:
row = TryoutSnapshotQuestion(
website_id=website_id,
source_tryout_id=source_tryout_id,
source_question_id=source_question_id,
latest_snapshot_id=snapshot.id,
question_title=question["title"] or question["question"],
question_html=question["question"],
explanation_html=question["explanation"],
raw_options=question["raw_options"],
correct_answer=question["correct_answer"],
category_id=question["category_id"],
category_name=question["category_name"],
category_code=question["category_code"],
option_count=question["option_count"],
has_option_labels=question["has_option_labels"],
is_active=True,
content_checksum=question["content_checksum"],
raw_payload=question["raw_payload"],
last_seen_at=now,
)
db.add(row)
new_questions += 1
continue
if existing.content_checksum != question["content_checksum"]:
existing.question_title = question["title"] or question["question"]
existing.question_html = question["question"]
existing.explanation_html = question["explanation"]
existing.raw_options = question["raw_options"]
existing.correct_answer = question["correct_answer"]
existing.category_id = question["category_id"]
existing.category_name = question["category_name"]
existing.category_code = question["category_code"]
existing.option_count = question["option_count"]
existing.has_option_labels = question["has_option_labels"]
existing.content_checksum = question["content_checksum"]
existing.raw_payload = question["raw_payload"]
updated_questions += 1
else:
unchanged_questions += 1
existing.latest_snapshot_id = snapshot.id
existing.is_active = True
existing.last_seen_at = now
removed_questions = 0
for source_question_id, existing in existing_questions.items():
if existing.is_active and source_question_id not in incoming_ids:
existing.is_active = False
existing.latest_snapshot_id = snapshot.id
existing.last_seen_at = now
removed_questions += 1
imported_tryouts.append(
{
"snapshot_id": snapshot.id,
"source_tryout_id": source_tryout_id,
"title": title,
"new_questions": new_questions,
"updated_questions": updated_questions,
"unchanged_questions": unchanged_questions,
"removed_questions": removed_questions,
"question_count": len(normalized_questions),
}
)
await db.flush()
return {
"source_format": SOURCE_FORMAT,
"website_id": website_id,
"preview": preview,
"imported_tryouts": imported_tryouts,
"message": "Tryout JSON snapshot imported as read-only reference data.",
}

View File

@@ -0,0 +1,110 @@
import asyncio
from types import SimpleNamespace
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from app.services.tryout_json_import import preview_tryout_json_import
class DummyScalarResult:
def __init__(self, value):
self._value = value
def scalar_one_or_none(self):
return self._value
class DummyScalars:
def __init__(self, values):
self._values = values
def all(self):
return self._values
class DummyListResult:
def __init__(self, values):
self._values = values
def scalars(self):
return DummyScalars(self._values)
class DummySession:
def __init__(self, responses):
self._responses = list(responses)
async def execute(self, _query):
return self._responses.pop(0)
def test_preview_tryout_json_import_classifies_new_updated_and_removed_questions():
payload = {
"export_info": {
"exported_at": "2026-04-02 09:12:59",
"exported_by": "Admin",
"tryout_id": 1038,
},
"tryouts": {
"tryout_1038": {
"info": {
"id": 1038,
"title": "Tryout PPDS Obgyn",
"permalink": "https://member.example.com/tryout/1038",
},
"questions": [
{
"id": 269,
"title": "Question A",
"question": "<p>Question A body</p>",
"options": [
{"increment": "A", "label": "", "value": "0"},
{"increment": "B", "label": "", "value": "1"},
],
"answer": "B",
"explanation": "<p>Because.</p>",
},
{
"id": 270,
"title": "Question B new",
"question": "<p>Question B body</p>",
"options": [
{"increment": "A", "label": "", "value": "1"},
{"increment": "B", "label": "", "value": "0"},
],
"answer": "A",
"explanation": "<p>New item.</p>",
},
],
"results": [],
}
},
}
existing_question = SimpleNamespace(
source_question_id="269",
content_checksum="old-checksum",
is_active=True,
)
removed_question = SimpleNamespace(
source_question_id="999",
content_checksum="removed-checksum",
is_active=True,
)
db = DummySession(
[
DummyScalarResult(SimpleNamespace(id=1)),
DummyListResult([existing_question, removed_question]),
]
)
preview = asyncio.run(preview_tryout_json_import(payload, website_id=1, db=db))
assert preview["tryout_count"] == 1
assert preview["totals"]["new_questions"] == 1
assert preview["totals"]["updated_questions"] == 1
assert preview["totals"]["removed_questions"] == 1
assert preview["totals"]["missing_option_labels"] == 2
assert "read-only reference data" not in str(preview)