from __future__ import annotations import json from typing import Any from fastapi import HTTPException from sqlalchemy import Select, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.models import AudioAsset, DisplayModule, Institution, RelatedNews, Report MODULE_META: dict[str, dict[str, Any]] = { "basic_info": {"layer": "p0", "render_mode": "inline", "has_detail_page": True, "is_publish_blocking": True, "requires_human_review": False}, "executive_overview": {"layer": "p0", "render_mode": "card_plus_page", "has_detail_page": True, "is_publish_blocking": True, "requires_human_review": False}, "core_insights": {"layer": "p0", "render_mode": "inline", "has_detail_page": True, "is_publish_blocking": True, "requires_human_review": False}, "key_data": {"layer": "p0", "render_mode": "card_plus_page", "has_detail_page": True, "is_publish_blocking": True, "requires_human_review": False}, "source_compliance": {"layer": "p0", "render_mode": "inline", "has_detail_page": True, "is_publish_blocking": True, "requires_human_review": False}, "differentiated_view": {"layer": "p1", "render_mode": "card_plus_page", "has_detail_page": True, "is_publish_blocking": False, "requires_human_review": False}, "weaknesses": {"layer": "p1", "render_mode": "card_plus_page", "has_detail_page": True, "is_publish_blocking": False, "requires_human_review": False}, "timeline": {"layer": "p1", "render_mode": "card_plus_page", "has_detail_page": True, "is_publish_blocking": False, "requires_human_review": False}, "study_guide": {"layer": "p1", "render_mode": "card_plus_page", "has_detail_page": True, "is_publish_blocking": False, "requires_human_review": False}, "related_sources": {"layer": "p1", "render_mode": "inline", "has_detail_page": True, "is_publish_blocking": False, "requires_human_review": True}, "structure_graph": {"layer": "p1", "render_mode": "card_plus_page", "has_detail_page": True, "is_publish_blocking": False, "requires_human_review": False}, "infographic": {"layer": "p2", "render_mode": "card_plus_page", "has_detail_page": True, "is_publish_blocking": False, "requires_human_review": True}, "audio": {"layer": "p2", "render_mode": "inline", "has_detail_page": True, "is_publish_blocking": False, "requires_human_review": False}, "research_discovery": {"layer": "p2", "render_mode": "card_plus_page", "has_detail_page": True, "is_publish_blocking": False, "requires_human_review": True}, "institution": {"layer": "p0", "render_mode": "inline", "has_detail_page": False, "is_publish_blocking": False, "requires_human_review": False}, } def loads_json(value: str | None, default: Any) -> Any: if not value: return default return json.loads(value) def iso(value: Any) -> str | None: return value.isoformat() if value else None def institution_public(inst: Institution, *, detail: bool = False) -> dict[str, Any]: data = { "institution_id": inst.institution_id, "name_cn": inst.name_cn, "name_en": inst.name_en, "institution_type": inst.institution_type, "source_tier": inst.source_tier, "website_url": inst.website_url, "covered_topics": loads_json(inst.covered_topics, []), "report_count": inst.report_count, "latest_report_at": iso(inst.latest_report_at), "credibility_note": inst.credibility_note, } if detail: data["intro_cn"] = inst.intro_cn return data def institution_card(inst: Institution) -> dict[str, Any]: return { "institution_id": inst.institution_id, "name_cn": inst.name_cn, "name_en": inst.name_en, "source_tier": inst.source_tier, } def report_card(report: Report, inst: Institution) -> dict[str, Any]: return { "report_id": report.report_id, "title_cn": report.title_cn, "subtitle_cn": report.subtitle_cn or "", "one_liner": report.one_liner, "institution": institution_card(inst), "topics": loads_json(report.topics, []), "released_at": iso(report.released_at), "has_audio": report.has_audio, "interpretation_label": report.interpretation_label, "source_tier": report.source_tier, "cache_version": report.cache_version, } def module_payload(module: DisplayModule) -> dict[str, Any]: meta = MODULE_META.get(module.type, {"layer": "p2", "render_mode": "card_plus_page", "has_detail_page": True, "is_publish_blocking": False, "requires_human_review": False}) envelope = loads_json(module.content, {}) render_mode = meta["render_mode"] content = None preview = None if render_mode == "inline": content = envelope.get("content", envelope) else: preview = envelope.get("preview", {}) return { "module_id": module.module_id, "type": module.type, "layer": meta["layer"], "render_mode": render_mode, "has_detail_page": meta["has_detail_page"], "is_publish_blocking": meta["is_publish_blocking"], "requires_human_review": meta["requires_human_review"], "sort_order": module.sort_order, "title_cn": module.title_cn, "content": content, "preview": preview, "content_ref": module.content_ref, "content_etag": module.content_etag, } class CatalogService: def __init__(self, session: AsyncSession) -> None: self.session = session async def _published_report_query(self) -> Select[tuple[Report, Institution]]: return ( select(Report, Institution) .join(Institution, Report.institution_id == Institution.institution_id) .where(Report.display_status == "published", Institution.status == "active") .order_by(Report.released_at.desc(), Report.report_id) ) async def report_cards( self, *, topic: str | None = None, institution_id: str | None = None, has_audio: bool | None = None, source_tier: str | None = None, q: str | None = None, page_size: int = 20, ) -> dict[str, Any]: stmt = await self._published_report_query() if topic: stmt = stmt.where(Report.topics.like(f"%{topic}%")) if institution_id: stmt = stmt.where(Report.institution_id == institution_id) if has_audio is not None: stmt = stmt.where(Report.has_audio == has_audio) if source_tier: stmt = stmt.where(Report.source_tier == source_tier) if q: stmt = stmt.where(Report.title_cn.like(f"%{q}%")) stmt = stmt.limit(min(max(page_size, 1), 50)) rows = (await self.session.execute(stmt)).all() return { "items": [report_card(report, inst) for report, inst in rows], "page": {"next_cursor": None, "has_more": False}, "cache_version": "feed:recommended:seed:v1", } async def report_detail(self, report_id: str) -> dict[str, Any]: row = ( await self.session.execute( select(Report, Institution) .join(Institution, Report.institution_id == Institution.institution_id) .where(Report.report_id == report_id, Report.display_status == "published", Institution.status == "active") ) ).one_or_none() if row is None: raise HTTPException(status_code=404, detail={"error": {"code": "REPORT_NOT_FOUND", "message": "报告不存在或未发布。"}}) report, inst = row modules = ( await self.session.execute( select(DisplayModule) .where(DisplayModule.report_id == report_id, DisplayModule.status == "published") .order_by(DisplayModule.sort_order) ) ).scalars().all() return { "report_id": report.report_id, "title_cn": report.title_cn, "subtitle_cn": report.subtitle_cn or "", "original_title": report.original_title, "one_liner": report.one_liner, "institution": institution_public(inst, detail=True), "source": { "source_url": report.source_url, "source_note": report.source_note, "source_tier": report.source_tier, "published_at": iso(report.published_at), }, "topics": loads_json(report.topics, []), "has_audio": report.has_audio, "interpretation_label": report.interpretation_label, "risk_disclaimer": report.risk_disclaimer, "released_at": iso(report.released_at), "cache_version": report.cache_version, "modules": [module_payload(module) for module in modules], } async def module_detail(self, report_id: str, module_id: str) -> dict[str, Any]: report = ( await self.session.execute(select(Report).where(Report.report_id == report_id, Report.display_status == "published")) ).scalar_one_or_none() if report is None: raise HTTPException(status_code=404, detail={"error": {"code": "REPORT_NOT_FOUND", "message": "报告不存在或未发布。"}}) module = ( await self.session.execute( select(DisplayModule).where(DisplayModule.report_id == report_id, DisplayModule.module_id == module_id, DisplayModule.status == "published") ) ).scalar_one_or_none() if module is None: raise HTTPException(status_code=404, detail={"error": {"code": "MODULE_HIDDEN", "message": "模块隐藏或不可见。"}}) envelope = loads_json(module.content, {}) content = envelope.get("full") or envelope.get("content") or envelope return { "module_id": module.module_id, "type": module.type, "title_cn": module.title_cn, "content": content, "content_etag": module.content_etag, "cache_version": report.cache_version, } async def institutions(self, *, topic: str | None = None, source_tier: str | None = None, page_size: int = 20) -> dict[str, Any]: stmt = select(Institution).where(Institution.status == "active").order_by(Institution.source_tier, Institution.name_cn).limit(min(max(page_size, 1), 50)) if topic: stmt = stmt.where(Institution.covered_topics.like(f"%{topic}%")) if source_tier: stmt = stmt.where(Institution.source_tier == source_tier) rows = (await self.session.execute(stmt)).scalars().all() return {"items": [institution_public(inst) for inst in rows], "page": {"next_cursor": None, "has_more": False}} async def institution_detail(self, institution_id: str) -> dict[str, Any]: inst = (await self.session.execute(select(Institution).where(Institution.institution_id == institution_id, Institution.status == "active"))).scalar_one_or_none() if inst is None: raise HTTPException(status_code=404, detail={"error": {"code": "INSTITUTION_NOT_FOUND", "message": "机构不存在。"}}) reports = await self.report_cards(institution_id=institution_id, page_size=5) detail = institution_public(inst, detail=True) detail["latest_report"] = reports["items"][0] if reports["items"] else None detail["recent_reports"] = reports["items"] return detail async def listen_items(self, *, page_size: int = 20) -> dict[str, Any]: stmt = ( select(AudioAsset, Report, Institution) .join(Report, AudioAsset.report_id == Report.report_id) .join(Institution, Report.institution_id == Institution.institution_id) .where(AudioAsset.status == "published", Report.display_status == "published") .order_by(Report.released_at.desc(), AudioAsset.audio_id) .limit(min(max(page_size, 1), 50)) ) rows = (await self.session.execute(stmt)).all() items = [ { "audio_id": audio.audio_id, "title_cn": audio.title_cn, "duration_sec": audio.duration_sec, "report_id": report.report_id, "report_title_cn": report.title_cn, "institution": institution_card(inst), "released_at": iso(report.released_at), "cache_version": report.cache_version, } for audio, report, inst in rows ] return {"items": items, "page": {"next_cursor": None, "has_more": False}, "cache_version": "listen:seed:v1"} async def seed_counts(self) -> dict[str, int]: models = { "institutions": Institution, "reports": Report, "audio_assets": AudioAsset, "display_modules": DisplayModule, "related_news": RelatedNews, } counts = {} for name, model in models.items(): counts[name] = await self.session.scalar(select(func.count()).select_from(model)) or 0 return counts