Harden auth and persist report schedules

This commit is contained in:
dwindown
2026-06-06 19:40:32 +07:00
parent aaf64264f7
commit fd7989f673
18 changed files with 748 additions and 105 deletions

View File

@@ -0,0 +1,212 @@
import asyncio
from types import SimpleNamespace
import pytest
from fastapi import HTTPException
from app.core import rate_limit
from app.core.config import Settings
from app.models.report_schedule import ReportScheduleModel
from app.services import ai_generation
from app.services.reporting import (
cancel_scheduled_report,
get_scheduled_report,
list_scheduled_reports,
schedule_report,
)
class DummyRequest:
client = SimpleNamespace(host="127.0.0.1")
class DummyScalarResult:
def __init__(self, value):
self._value = value
def scalar_one_or_none(self):
return self._value
def scalar(self):
return self._value
class DummyScalars:
def __init__(self, values):
self._values = values
def all(self):
return self._values
class DummyListResult:
def __init__(self, values):
self._values = values
def scalars(self):
return DummyScalars(self._values)
class DummyRowsResult:
def __init__(self, values):
self._values = values
def all(self):
return self._values
class DummyDb:
def __init__(self, execute_results=None):
self.execute_results = list(execute_results or [])
self.added = []
self.flushed = False
def add(self, row):
self.added.append(row)
async def flush(self):
self.flushed = True
async def execute(self, _query):
return self.execute_results.pop(0)
class DummyRedis:
def __init__(self):
self.calls = 0
async def incr(self, _key):
self.calls += 1
return self.calls
async def expire(self, _key, _seconds):
return True
async def ttl(self, _key):
return 60
def test_ai_stats_accepts_website_scope(monkeypatch):
captured_queries = []
class CaptureDb:
async def execute(self, query):
captured_queries.append(str(query))
if len(captured_queries) == 1:
return DummyScalarResult(0)
return DummyRowsResult([])
asyncio.run(ai_generation.get_ai_stats(CaptureDb(), website_id=9))
assert all("items.website_id" in query for query in captured_queries)
def test_production_init_db_skips_create_all(monkeypatch):
import app.database as database
class ExplodingEngine:
def begin(self):
raise AssertionError("create_all should not run in production")
monkeypatch.setattr(database, "settings", Settings(ENVIRONMENT="production"))
monkeypatch.setattr(database, "engine", ExplodingEngine())
asyncio.run(database.init_db())
def test_rate_limit_uses_redis_and_blocks_when_limit_exceeded(monkeypatch):
dummy_redis = DummyRedis()
rate_limit.reset_rate_limit_state()
monkeypatch.setattr(rate_limit, "_get_redis_client", lambda: dummy_redis)
asyncio.run(
rate_limit.enforce_rate_limit(
DummyRequest(),
scope="test.redis",
max_requests=1,
window_seconds=60,
)
)
with pytest.raises(HTTPException) as exc_info:
asyncio.run(
rate_limit.enforce_rate_limit(
DummyRequest(),
scope="test.redis",
max_requests=1,
window_seconds=60,
)
)
assert exc_info.value.status_code == 429
def test_rate_limit_falls_back_to_memory_when_redis_unavailable(monkeypatch):
rate_limit.reset_rate_limit_state()
monkeypatch.setattr(rate_limit, "_get_redis_client", lambda: None)
asyncio.run(
rate_limit.enforce_rate_limit(
DummyRequest(),
scope="test.memory",
max_requests=1,
window_seconds=60,
)
)
with pytest.raises(HTTPException) as exc_info:
asyncio.run(
rate_limit.enforce_rate_limit(
DummyRequest(),
scope="test.memory",
max_requests=1,
window_seconds=60,
)
)
assert exc_info.value.status_code == 429
def test_schedule_report_persists_model_row():
db = DummyDb()
schedule_id = asyncio.run(
schedule_report(
db,
report_type="student_performance",
schedule="daily",
tryout_ids=["t1"],
website_id=3,
recipients=["ops@example.com"],
export_format="xlsx",
)
)
assert db.flushed is True
assert isinstance(db.added[0], ReportScheduleModel)
assert db.added[0].schedule_id == schedule_id
assert db.added[0].website_id == 3
def test_schedule_helpers_read_list_and_soft_cancel():
row = ReportScheduleModel(
schedule_id="sched-1",
report_type="student_performance",
schedule="daily",
tryout_ids=["t1"],
website_id=3,
recipients=["ops@example.com"],
format="xlsx",
is_active=True,
)
get_db = DummyDb([DummyScalarResult(row)])
listed_db = DummyDb([DummyListResult([row])])
cancel_db = DummyDb([DummyScalarResult(row)])
got = asyncio.run(get_scheduled_report(get_db, "sched-1"))
listed = asyncio.run(list_scheduled_reports(listed_db, website_id=3))
cancelled = asyncio.run(cancel_scheduled_report(cancel_db, "sched-1"))
assert got.schedule_id == "sched-1"
assert listed[0].website_id == 3
assert cancelled is True
assert row.is_active is False

View File

@@ -0,0 +1,132 @@
import asyncio
import inspect
from datetime import datetime, timezone
from fastapi.params import Depends
from app.api.v1.session import SubmitAnswerResponse
from app.core.auth import AuthContext, get_auth_context
from app.routers import admin as admin_router
from app.routers import reports as reports_router
from app.routers import wordpress as wordpress_router
from app.schemas.session import SessionCompleteResponse, UserAnswerOutput
from app.services.reporting import AggregatePerformanceStats, StudentPerformanceReport
def _depends_on_auth(callable_obj, parameter_name: str = "auth") -> bool:
parameter = inspect.signature(callable_obj).parameters[parameter_name]
default = parameter.default
return isinstance(default, Depends) and default.dependency is get_auth_context
def test_admin_actions_require_signed_auth_context():
assert _depends_on_auth(admin_router.admin_trigger_calibration)
assert _depends_on_auth(admin_router.admin_toggle_ai_generation)
assert _depends_on_auth(admin_router.admin_reset_normalization)
def test_wordpress_user_lookup_routes_require_signed_auth_context():
assert _depends_on_auth(wordpress_router.get_website_users)
assert _depends_on_auth(wordpress_router.get_user_endpoint)
def test_wordpress_roles_map_to_api_admin_roles():
assert wordpress_router._api_role_from_wordpress_roles(["subscriber"]) == "student"
assert wordpress_router._api_role_from_wordpress_roles(["administrator"]) == "admin"
assert wordpress_router._api_role_from_wordpress_roles(["super_admin"]) == "system_admin"
def test_adaptive_submit_response_does_not_expose_answer_key_or_correctness():
payload = SubmitAnswerResponse(theta=0.12, theta_se=0.8).model_dump()
assert "is_correct" not in payload
assert "correct_answer" not in payload
assert "explanation" not in payload
def test_session_completion_answer_output_does_not_expose_correctness():
answer_payload = UserAnswerOutput(
id=1,
item_id=10,
response="A",
time_spent=12,
bobot_earned=0.5,
scoring_mode_used="ctt",
).model_dump()
assert "is_correct" not in answer_payload
response_payload = SessionCompleteResponse(
id=1,
session_id="s-1",
wp_user_id="wp-1",
website_id=2,
tryout_id="tryout-1",
start_time=datetime.now(timezone.utc),
end_time=datetime.now(timezone.utc),
is_completed=True,
scoring_mode_used="ctt",
total_benar=1,
total_bobot_earned=0.5,
NM=500,
NN=500,
rataan_used=500,
sb_used=100,
user_answers=[
UserAnswerOutput(
id=1,
item_id=10,
response="A",
time_spent=12,
bobot_earned=0.5,
scoring_mode_used="ctt",
)
],
).model_dump()
assert "is_correct" not in response_payload["user_answers"][0]
def test_student_performance_report_is_scoped_to_student_user(monkeypatch):
captured = {}
async def fake_generate_student_performance_report(**kwargs):
captured.update(kwargs)
return StudentPerformanceReport(
generated_at=datetime.now(timezone.utc),
tryout_id=kwargs["tryout_id"],
website_id=kwargs["website_id"],
date_range=kwargs["date_range"],
aggregate=AggregatePerformanceStats(
tryout_id=kwargs["tryout_id"],
participant_count=0,
avg_nm=None,
std_nm=None,
min_nm=None,
max_nm=None,
median_nm=None,
avg_nn=None,
std_nn=None,
avg_theta=None,
pass_rate=0.0,
avg_time_spent=0.0,
),
individual_records=[],
)
monkeypatch.setattr(
reports_router,
"generate_student_performance_report",
fake_generate_student_performance_report,
)
asyncio.run(
reports_router.get_student_performance_report(
tryout_id="tryout-1",
db=object(),
auth=AuthContext(website_id=5, role="student", wp_user_id="wp-1"),
)
)
assert captured["website_id"] == 5
assert captured["wp_user_id"] == "wp-1"