diff --git a/app/ui/portfolio_config.py b/app/ui/portfolio_config.py index 1b73fc8..2b58bf5 100644 --- a/app/ui/portfolio_config.py +++ b/app/ui/portfolio_config.py @@ -146,7 +146,7 @@ def render_portfolio_config() -> None: st.bar_chart( positions_df.set_index("股票")["持仓比例"], - use_container_width=True + width="stretch" ) with col2: @@ -158,5 +158,5 @@ def render_portfolio_config() -> None: st.bar_chart( sectors_df.set_index("行业")["敞口"], - use_container_width=True + width="stretch" ) diff --git a/app/ui/views/tests.py b/app/ui/views/tests.py index 5e86f35..5930cac 100644 --- a/app/ui/views/tests.py +++ b/app/ui/views/tests.py @@ -1,9 +1,11 @@ """自检测试视图。""" from __future__ import annotations +from collections import Counter from datetime import date import streamlit as st +import pandas as pd from app.data.schema import initialize_database from app.ingest.checker import run_boot_check @@ -11,6 +13,7 @@ from app.ingest.tushare import FetchJob, run_ingestion from app.llm.client import llm_config_snapshot, run_llm from app.utils import alerts from app.utils.config import get_config, save_config +from app.utils.data_quality import run_data_quality_checks from app.ui.shared import LOGGER, LOG_EXTRA from app.ui.views.dashboard import update_dashboard_sidebar @@ -202,3 +205,50 @@ def render_tests() -> None: LOGGER.info("LLM 测试成功", extra=LOG_EXTRA) st.success("LLM 调用成功,以下为返回内容:") st.write(response) + + st.divider() + st.subheader("数据质量验证") + st.write("快速检查候选池、策略评估、持仓快照与新闻数据的更新情况。") + dq_window = int( + st.number_input( + "数据更新窗口(天)", + min_value=3, + max_value=60, + value=7, + step=1, + help="用于判断数据是否过期的时间窗口。", + ) + ) + if st.button("运行数据质量验证"): + LOGGER.info("执行数据质量验证 window_days=%s", dq_window, extra=LOG_EXTRA) + with st.spinner("正在执行数据质量检查..."): + results = run_data_quality_checks(window_days=dq_window) + + level_counts = Counter(item.severity for item in results) + metric_cols = st.columns(3) + metric_cols[0].metric("错误", level_counts.get("ERROR", 0)) + metric_cols[1].metric("警告", level_counts.get("WARN", 0)) + metric_cols[2].metric("提示", level_counts.get("INFO", 0)) + + if not results: + st.info("未返回任何检查结果。") + else: + df = pd.DataFrame( + [ + { + "检查项": item.check, + "级别": item.severity, + "说明": item.detail, + "附加信息": item.extras or {}, + } + for item in results + ] + ) + st.dataframe(df, hide_index=True, width="stretch") + with st.expander("导出检查结果", expanded=False): + st.download_button( + "下载 CSV", + data=df.to_csv(index=False), + file_name="data_quality_results.csv", + mime="text/csv", + ) diff --git a/app/ui/views/today.py b/app/ui/views/today.py index 3602b82..7f46897 100644 --- a/app/ui/views/today.py +++ b/app/ui/views/today.py @@ -13,8 +13,10 @@ import streamlit as st from app.backtest.engine import BacktestEngine, PortfolioState, BtConfig from app.utils.portfolio import ( InvestmentCandidate, - get_candidate_pool, + PortfolioPosition, get_portfolio_settings_snapshot, + list_investment_pool, + list_positions, ) from app.utils.db import db_session @@ -42,6 +44,48 @@ def _parse_trade_date(trade_date: str | int | date) -> date: raise ValueError(f"无法解析交易日:{trade_date}") from exc +def _resolve_trade_date_variants(trade_date: str | int | date) -> List[str]: + """Return possible string representations for the given trade date.""" + variants: List[str] = [] + str_val = str(trade_date).strip() + if str_val: + variants.append(str_val) + try: + dt = _parse_trade_date(trade_date) + except ValueError: + dt = None + if dt is not None: + variants.extend( + [ + dt.strftime("%Y%m%d"), + dt.isoformat(), + ] + ) + # 保持顺序同时去重 + seen = set() + unique_variants: List[str] = [] + for item in variants: + if item and item not in seen: + unique_variants.append(item) + seen.add(item) + return unique_variants + + +def _load_candidate_records( + trade_date: str | int | date, +) -> tuple[List[InvestmentCandidate], Optional[str], bool]: + """Load candidate pool records trying multiple trade date representations.""" + variants = _resolve_trade_date_variants(trade_date) + for variant in variants: + records = list_investment_pool(trade_date=variant) + if records: + return records, variant, False + latest_records = list_investment_pool() + if latest_records: + return latest_records, latest_records[0].trade_date, True + return [], None, False + + def _fetch_agent_actions(trade_date: str, symbols: List[str]) -> Dict[str, Dict[str, Optional[str]]]: unique_symbols = list(dict.fromkeys(symbols)) if not unique_symbols: @@ -205,31 +249,40 @@ def render_today_plan() -> None: """, (trade_date,), ).fetchall() - symbols = [row["ts_code"] for row in code_rows] + agent_symbols = [row["ts_code"] for row in code_rows] - candidate_records, fallback_used = get_candidate_pool(trade_date=trade_date) + candidate_records, candidate_data_date, candidate_fallback = _load_candidate_records(trade_date) if candidate_records: - message = ( - f"候选池包含 {len(candidate_records)} 个标的:" - + "、".join(item.ts_code for item in candidate_records[:12]) - + ("…" if len(candidate_records) > 12 else "") + preview_codes = "、".join(item.ts_code for item in candidate_records[:12]) + if len(candidate_records) > 12: + preview_codes += "…" + source_note = "" + if candidate_data_date and candidate_data_date != trade_date: + source_note = f"(基于 {candidate_data_date} 数据)" + elif candidate_fallback: + source_note = "(使用最近候选池)" + st.caption( + f"候选池包含 {len(candidate_records)} 个标的:{preview_codes}{source_note}" ) - if fallback_used: - message += "(使用最新候选池)" - st.caption(message) - - if candidate_records: - candidate_codes = [item.ts_code for item in candidate_records] - symbols = list(dict.fromkeys(candidate_codes + symbols)) else: - st.caption("所选日期暂无候选池数据,仍可查看代理决策记录。") + st.caption("所选日期暂无候选池数据,仍可查看代理决策记录或触发重评估。") + + candidate_codes = [item.ts_code for item in candidate_records] + positions = list_positions(active_only=True) + position_map: Dict[str, PortfolioPosition] = {pos.ts_code: pos for pos in positions} + position_codes = [pos.ts_code for pos in positions] + + symbols = list( + dict.fromkeys(candidate_codes + agent_symbols + position_codes) + ) with actions_col: - metrics_cols = st.columns(2) - metrics_cols[0].metric("标的数量", len(symbols)) + metrics_cols = st.columns(3) + metrics_cols[0].metric("已产出决策数", len(agent_symbols)) metrics_cols[1].metric("候选池标的", len(candidate_records)) - st.caption("一键触发策略重评估(包含当前交易日的所有标的)。") - if st.button("一键重评估全部", type="primary", use_container_width=True): + metrics_cols[2].metric("当前持仓", len(position_codes)) + st.caption("对当前交易日所有标的触发策略重评估。") + if st.button("一键重评估全部", type="primary", width="stretch"): with st.spinner("正在对所有标的进行重评估,请稍候..."): try: trade_date_obj = _parse_trade_date(trade_date) @@ -251,87 +304,117 @@ def render_today_plan() -> None: LOGGER.exception("一键重评估失败", extra=LOG_EXTRA) st.error(f"一键重评估执行过程中发生错误:{exc}") - detail_tab, assistant_tab = st.tabs(["标的详情", "投资助理模式"]) - with assistant_tab: - _render_today_plan_assistant_view(trade_date, candidate_records) + overview_tab, detail_tab = st.tabs(["组合总览", "标的详情"]) + with overview_tab: + _render_today_plan_assistant_view( + trade_date, + candidate_records, + candidate_data_date, + candidate_fallback, + positions, + ) with detail_tab: if not symbols: st.info("所选交易日暂无 agent_utils 记录。") else: - _render_today_plan_symbol_view(trade_date, symbols, query, candidate_records) + _render_today_plan_symbol_view( + trade_date, + symbols, + query, + candidate_records, + position_map, + ) def _render_today_plan_assistant_view( trade_date: str | int | date, candidate_records: List[InvestmentCandidate], + candidate_data_date: Optional[str], + fallback_used: bool, + positions: List[PortfolioPosition], ) -> None: - # 确保日期格式为字符串 - if isinstance(trade_date, date): - trade_date = trade_date.strftime("%Y%m%d") - st.info("已开启投资助理模式:以下内容为组合级(去标的)建议,不包含任何具体标的代码。") - try: - candidates = candidate_records or list_investment_pool(trade_date=trade_date) - if candidates: - scores = [float(item.score or 0.0) for item in candidates] - statuses = [item.status or "UNKNOWN" for item in candidates] - tags: List[str] = [] - rationales: List[str] = [] - for item in candidates: - if getattr(item, "tags", None): - tags.extend(item.tags) - if getattr(item, "rationale", None): - rationales.append(str(item.rationale)) - cnt = Counter(statuses) - tag_cnt = Counter(tags) - st.subheader("候选池聚合概览(已匿名化)") - col_a, col_b, col_c = st.columns(3) - col_a.metric("候选数", f"{len(candidates)}") - col_b.metric("平均评分", f"{np.mean(scores):.3f}" if scores else "-") - col_c.metric("中位评分", f"{np.median(scores):.3f}" if scores else "-") + display_date = candidate_data_date or str(trade_date) + st.info("组合总览聚焦候选池与仓位概况,用于快速判断今日是否需要调整策略。") + if fallback_used and candidate_data_date and candidate_data_date != str(trade_date): + st.caption(f"候选池数据来自 {candidate_data_date}(最近可用日期)。") + elif candidate_data_date and candidate_data_date != str(trade_date): + st.caption(f"候选池数据基于 {candidate_data_date}。") - st.write("状态分布:") - st.json(dict(cnt)) + candidates = candidate_records + if not candidates: + st.warning("尚未生成候选池,请先运行回测/评估流程或触发上方的一键重评估。") + return - if tag_cnt: - st.write("常见标签(示例):") - st.json(dict(tag_cnt.most_common(10))) + scores = [float(item.score or 0.0) for item in candidates] + statuses = [item.status or "UNKNOWN" for item in candidates] + tags: List[str] = [] + rationales: List[str] = [] + for item in candidates: + if getattr(item, "tags", None): + tags.extend(item.tags) + if getattr(item, "rationale", None): + rationales.append(str(item.rationale)) - if rationales: - st.write("汇总理由(节选,不含代码):") - seen = set() - excerpts = [] - for rationale in rationales: - text = rationale.strip() - if text and text not in seen: - seen.add(text) - excerpts.append(text) - if len(excerpts) >= 3: - break - for idx, excerpt in enumerate(excerpts, start=1): - st.markdown(f"**理由 {idx}:** {excerpt}") + cnt = Counter(statuses) + tag_cnt = Counter(tags) - avg_score = float(np.mean(scores)) if scores else 0.0 - suggest_pct = max(0.0, min(0.3, 0.10 + (avg_score - 0.5) * 0.2)) - st.subheader("组合级建议(不指定标的)") - st.write( - f"基于候选池平均评分 {avg_score:.3f},建议今日用于新增买入的现金比例约为 {suggest_pct:.0%}。" - ) - st.write( - "建议分配思路:在候选池中挑选若干得分较高的标的按目标权重等比例分配,或以分批买入的方式分摊入场时点。" - ) - if st.button("生成组合级操作建议(仅输出,不执行)"): - st.success("已生成组合级建议(仅供参考)。") - st.write({ - "候选数": len(candidates), - "平均评分": avg_score, - "建议新增买入比例": f"{suggest_pct:.0%}", - }) - else: - st.info("所选交易日暂无候选投资池数据。") - except Exception: # noqa: BLE001 - LOGGER.exception("加载候选池聚合信息失败", extra=LOG_EXTRA) - st.error("加载候选池数据时发生错误。") + col_a, col_b, col_c = st.columns(3) + col_a.metric("候选数", f"{len(candidates)}") + col_b.metric("平均评分", f"{np.mean(scores):.3f}" if scores else "-") + col_c.metric("中位评分", f"{np.median(scores):.3f}" if scores else "-") + + if positions: + total_market_value = sum(float(pos.market_value or 0.0) for pos in positions) + position_cols = st.columns(3) + position_cols[0].metric("持仓数", len(positions)) + position_cols[1].metric( + "持仓总市值", + f"{total_market_value:,.0f}" if total_market_value else "-", + ) + exposure = sum(float(pos.target_weight or 0.0) for pos in positions) + position_cols[2].metric("目标权重合计", f"{exposure:.0%}" if exposure else "-") + + st.write("状态分布:") + st.json(dict(cnt)) + + if tag_cnt: + st.write("常见标签(示例):") + st.json(dict(tag_cnt.most_common(10))) + + if rationales: + st.write("汇总理由(节选,不含代码):") + seen = set() + excerpts = [] + for rationale in rationales: + text = rationale.strip() + if text and text not in seen: + seen.add(text) + excerpts.append(text) + if len(excerpts) >= 3: + break + for idx, excerpt in enumerate(excerpts, start=1): + st.markdown(f"**理由 {idx}:** {excerpt}") + + avg_score = float(np.mean(scores)) if scores else 0.0 + suggest_pct = max(0.0, min(0.3, 0.10 + (avg_score - 0.5) * 0.2)) + st.subheader("组合级建议(不指定标的)") + st.write( + f"基于候选池平均评分 {avg_score:.3f},建议今日用于新增买入的现金比例约为 {suggest_pct:.0%}。" + ) + st.write( + "建议分配思路:在候选池中挑选若干得分较高的标的按目标权重等比例分配,或以分批买入的方式分摊入场时点。" + ) + if st.button("生成组合级操作建议(仅输出,不执行)"): + st.success("已生成组合级建议(仅供参考)。") + st.write( + { + "候选数": len(candidates), + "平均评分": avg_score, + "建议新增买入比例": f"{suggest_pct:.0%}", + "候选数据日期": display_date, + } + ) def _render_today_plan_symbol_view( @@ -339,6 +422,7 @@ def _render_today_plan_symbol_view( symbols: List[str], query_params: Dict[str, List[str]], candidate_records: List[InvestmentCandidate], + position_map: Dict[str, PortfolioPosition], ) -> None: default_ts = query_params.get("code", [symbols[0]])[0] try: @@ -363,12 +447,9 @@ def _render_today_plan_symbol_view( (trade_date, ts_code), ).fetchall() - if not rows: - st.info("未查询到详细决策记录,稍后再试。") - return - candidate_map = {item.ts_code: item for item in candidate_records} candidate_info = candidate_map.get(ts_code) + position_info = position_map.get(ts_code) if candidate_info: info_cols = st.columns(4) info_cols[0].metric("候选评分", f"{(candidate_info.score or 0):.3f}") @@ -377,6 +458,30 @@ def _render_today_plan_symbol_view( info_cols[3].metric("行业", candidate_info.industry or "-") if candidate_info.rationale: st.caption(f"候选理由:{candidate_info.rationale}") + if position_info: + pos_cols = st.columns(4) + pos_cols[0].metric("持仓数量", f"{position_info.quantity:.0f}") + pos_cols[1].metric( + "成本价", + f"{position_info.cost_price:.2f}", + ) + market_price = position_info.market_price + pos_cols[2].metric("市价", f"{market_price:.2f}" if market_price else "-") + pnl = (position_info.unrealized_pnl or 0.0) + (position_info.realized_pnl or 0.0) + pos_cols[3].metric("累计盈亏", f"{pnl:,.0f}") + if position_info.target_weight is not None: + st.caption(f"目标权重:{position_info.target_weight:.0%}") + + _render_today_plan_reevaluation_controls( + trade_date, + ts_code, + batch_symbols, + add_divider=False, + ) + + if not rows: + st.info("该标的尚未生成部门/代理决策,可尝试触发重评估。") + return try: feasible_actions = json.loads(rows[0]["feasible"] or "[]") @@ -685,11 +790,19 @@ def _render_today_plan_symbol_view( st.divider() st.info("投资池与仓位概览已移至单独页面。请在侧边或页面导航中选择“投资池/仓位”以查看详细信息。") - st.divider() +def _render_today_plan_reevaluation_controls( + trade_date: str | int | date, + ts_code: str, + batch_symbols: List[str], + *, + add_divider: bool = True, +) -> None: + if add_divider: + st.divider() st.subheader("策略重评估") st.caption("对当前选中的交易日与标的,立即触发一次策略评估并回写 agent_utils。") cols_re = st.columns([1, 1]) - if cols_re[0].button("对该标的重评估", key="reevaluate_current_symbol"): + if cols_re[0].button("对该标的重评估", key=f"reevaluate_{ts_code}"): with st.spinner("正在重评估..."): try: trade_date_obj = _parse_trade_date(trade_date) @@ -713,7 +826,12 @@ def _render_today_plan_symbol_view( except Exception as exc: # noqa: BLE001 LOGGER.exception("重评估失败", extra=LOG_EXTRA) st.error(f"重评估失败:{exc}") - if cols_re[1].button("批量重评估(所选)", key="reevaluate_batch", disabled=not batch_symbols): + disabled = not batch_symbols + if cols_re[1].button( + "批量重评估(所选)", + key="reevaluate_batch_symbols", + disabled=disabled, + ): with st.spinner("批量重评估执行中..."): try: trade_date_obj = _parse_trade_date(trade_date) diff --git a/app/utils/data_quality.py b/app/utils/data_quality.py new file mode 100644 index 0000000..f2f6df8 --- /dev/null +++ b/app/utils/data_quality.py @@ -0,0 +1,368 @@ +"""Utility helpers for performing lightweight data quality checks.""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import date, datetime, timedelta +from typing import Dict, Iterable, List, Optional + +from app.utils.db import db_session +from app.utils.logging import get_logger + +LOGGER = get_logger(__name__) +LOG_EXTRA = {"stage": "data_quality"} + +Severity = str # Literal["ERROR", "WARN", "INFO"] (avoid importing Literal for py<3.8) + + +@dataclass +class DataQualityResult: + check: str + severity: Severity + detail: str + extras: Optional[Dict[str, object]] = None + + +def _parse_date(value: object) -> Optional[date]: + """Best-effort parse for trade_date columns stored as str/int.""" + if value is None: + return None + if isinstance(value, date): + return value + text = str(value).strip() + if not text: + return None + for fmt in ("%Y-%m-%d", "%Y%m%d"): + try: + return datetime.strptime(text, fmt).date() + except ValueError: + continue + try: + return datetime.fromisoformat(text).date() + except ValueError: + LOGGER.debug("无法解析日期字段 value=%s", value, extra=LOG_EXTRA) + return None + + +def run_data_quality_checks(*, window_days: int = 7) -> List[DataQualityResult]: + """Execute a suite of lightweight data quality checks.""" + results: List[DataQualityResult] = [] + today = date.today() + window_start = today - timedelta(days=window_days) + + try: + with db_session(read_only=True) as conn: + # 1. 候选池最新数据 + try: + row = conn.execute( + """ + SELECT trade_date, COUNT(DISTINCT ts_code) AS cnt + FROM investment_pool + ORDER BY trade_date DESC + LIMIT 1 + """ + ).fetchone() + except Exception: # noqa: BLE001 + LOGGER.exception("查询 investment_pool 失败", extra=LOG_EXTRA) + results.append( + DataQualityResult( + "候选池可用性", + "ERROR", + "读取候选池数据失败,请检查 investment_pool 表结构与权限。", + ) + ) + row = None + + latest_candidate_date = _parse_date(row["trade_date"]) if row else None + candidate_count = int(row["cnt"]) if row and row["cnt"] is not None else 0 + if row is None: + pass + elif latest_candidate_date is None: + results.append( + DataQualityResult( + "候选池可用性", + "ERROR", + "未解析到候选池日期,请确认 trade_date 字段格式。", + {"raw_value": row["trade_date"]}, + ) + ) + else: + age = (today - latest_candidate_date).days + extras = { + "trade_date": latest_candidate_date.isoformat(), + "candidate_count": candidate_count, + "age_days": age, + } + if candidate_count == 0: + results.append( + DataQualityResult( + "候选池可用性", + "ERROR", + f"{latest_candidate_date} 的候选池为空。", + extras, + ) + ) + elif age > window_days: + results.append( + DataQualityResult( + "候选池可用性", + "WARN", + f"候选池停留在 {latest_candidate_date},已超过 {window_days} 天未更新。", + extras, + ) + ) + else: + results.append( + DataQualityResult( + "候选池可用性", + "INFO", + f"最新候选池({latest_candidate_date})包含 {candidate_count} 个标的。", + extras, + ) + ) + + # 2. agent_utils 覆盖率 + try: + row_agent = conn.execute( + """ + SELECT trade_date, COUNT(DISTINCT ts_code) AS cnt + FROM agent_utils + ORDER BY trade_date DESC + LIMIT 1 + """ + ).fetchone() + except Exception: # noqa: BLE001 + LOGGER.exception("查询 agent_utils 失败", extra=LOG_EXTRA) + results.append( + DataQualityResult( + "策略评估数据", + "ERROR", + "读取 agent_utils 失败,无法评估部门/代理数据是否可用。", + ) + ) + row_agent = None + + latest_agent_date = _parse_date(row_agent["trade_date"]) if row_agent else None + agent_count = int(row_agent["cnt"]) if row_agent and row_agent["cnt"] is not None else 0 + if row_agent is None: + pass + elif latest_agent_date is None: + results.append( + DataQualityResult( + "策略评估数据", + "ERROR", + "未解析到 agent_utils 日期,请确认 trade_date 字段格式。", + {"raw_value": row_agent["trade_date"]}, + ) + ) + else: + extras = { + "trade_date": latest_agent_date.isoformat(), + "decision_count": agent_count, + } + if agent_count == 0: + results.append( + DataQualityResult( + "策略评估数据", + "WARN", + f"{latest_agent_date} 的 agent_utils 中未找到标的记录。", + extras, + ) + ) + else: + results.append( + DataQualityResult( + "策略评估数据", + "INFO", + f"{latest_agent_date} 共有 {agent_count} 个标的完成策略评估。", + extras, + ) + ) + + if latest_candidate_date and latest_candidate_date != latest_agent_date: + results.append( + DataQualityResult( + "候选与策略同步", + "WARN", + "候选池与策略评估日期不一致,建议重新触发评估或数据同步。", + { + "candidate_date": latest_candidate_date.isoformat(), + "agent_date": latest_agent_date.isoformat(), + }, + ) + ) + + # 3. 开仓记录 vs 快照 + try: + open_positions = conn.execute( + """ + SELECT COUNT(*) AS cnt + FROM portfolio_positions + WHERE status = 'open' + """ + ).fetchone() + open_position_count = int(open_positions["cnt"]) if open_positions else 0 + except Exception: # noqa: BLE001 + LOGGER.exception("查询 portfolio_positions 失败", extra=LOG_EXTRA) + open_position_count = 0 + results.append( + DataQualityResult( + "持仓数据", + "WARN", + "无法读取当前持仓,检查 portfolio_positions 表是否存在。", + ) + ) + + latest_snapshot_date = None + snapshot_date_column = None + try: + snapshot_info = conn.execute("PRAGMA table_info(portfolio_snapshots)").fetchall() + except Exception: # noqa: BLE001 + LOGGER.exception("读取 portfolio_snapshots 结构失败", extra=LOG_EXTRA) + snapshot_info = [] + + preferred_snapshot_columns: Iterable[str] = ( + "as_of", + "snapshot_date", + "trade_date", + "date", + "created_at", + "timestamp", + ) + available_snapshot_columns: List[str] = [] + for row in snapshot_info: + name = row["name"] if "name" in row.keys() else row[1] + available_snapshot_columns.append(name) + if name in preferred_snapshot_columns and snapshot_date_column is None: + snapshot_date_column = name + + if snapshot_date_column: + try: + snap_row = conn.execute( + f""" + SELECT MAX({snapshot_date_column}) AS latest_snapshot + FROM portfolio_snapshots + """ + ).fetchone() + if snap_row and snap_row["latest_snapshot"]: + latest_snapshot_date = _parse_date(snap_row["latest_snapshot"]) + except Exception: # noqa: BLE001 + LOGGER.exception("查询 portfolio_snapshots 失败", extra=LOG_EXTRA) + elif available_snapshot_columns: + results.append( + DataQualityResult( + "持仓快照", + "WARN", + "未找到标准快照日期字段(如 as_of/snapshot_date),请确认表结构。", + {"columns": available_snapshot_columns}, + ) + ) + else: + results.append( + DataQualityResult( + "持仓快照", + "WARN", + "未检测到 portfolio_snapshots 表,无法校验持仓快照。", + ) + ) + + if open_position_count > 0: + if latest_snapshot_date is None: + results.append( + DataQualityResult( + "持仓快照", + "WARN", + "存在未平仓头寸,但未找到任何持仓快照记录。", + {"open_positions": open_position_count}, + ) + ) + elif latest_snapshot_date < window_start: + results.append( + DataQualityResult( + "持仓快照", + "WARN", + f"最新持仓快照停留在 {latest_snapshot_date},已超过窗口 {window_days} 天。", + { + "latest_snapshot": latest_snapshot_date.isoformat(), + "open_positions": open_position_count, + }, + ) + ) + else: + results.append( + DataQualityResult( + "持仓快照", + "INFO", + f"最新持仓快照日期:{latest_snapshot_date}。", + { + "latest_snapshot": latest_snapshot_date.isoformat(), + "open_positions": open_position_count, + }, + ) + ) + + # 4. 新闻数据时效 + latest_news_date = None + try: + news_row = conn.execute( + """ + SELECT MAX(pub_time) AS latest_pub + FROM news + """ + ).fetchone() + if news_row and news_row["latest_pub"]: + try: + latest_news_date = datetime.fromisoformat( + str(news_row["latest_pub"]) + ) + except ValueError: + LOGGER.debug( + "无法解析新闻时间字段 value=%s", + news_row["latest_pub"], + extra=LOG_EXTRA, + ) + except Exception: # noqa: BLE001 + LOGGER.exception("查询 news 失败", extra=LOG_EXTRA) + + if latest_news_date: + if latest_news_date.tzinfo is not None: + now_ts = datetime.now(tz=latest_news_date.tzinfo) + else: + now_ts = datetime.now() + delta_days = (now_ts - latest_news_date).days + if delta_days > window_days: + results.append( + DataQualityResult( + "新闻数据时效", + "WARN", + f"最新新闻时间为 {latest_news_date}, 已超过 {window_days} 天未更新。", + {"latest_pub_time": str(latest_news_date)}, + ) + ) + else: + results.append( + DataQualityResult( + "新闻数据时效", + "INFO", + f"新闻数据最新时间:{latest_news_date}", + {"latest_pub_time": str(latest_news_date)}, + ) + ) + else: + results.append( + DataQualityResult( + "新闻数据时效", + "WARN", + "未找到最新新闻记录,请检查 RSS 或新闻数据接入。", + ) + ) + except Exception: # noqa: BLE001 + LOGGER.exception("执行数据质量检查失败", extra=LOG_EXTRA) + results.append( + DataQualityResult( + "运行状态", + "ERROR", + "数据质量检查过程中发生异常,请查看日志。", + ) + ) + + return results diff --git a/docs/TODO.md b/docs/TODO.md index a6cca09..a1fd722 100644 --- a/docs/TODO.md +++ b/docs/TODO.md @@ -38,7 +38,7 @@ | 工作项 | 状态 | 说明 | | --- | --- | --- | -| 一键重评估入口 | ⏳ | 今日计划页需支持重新触发评估流程。 | +| 一键重评估入口 | ✅ | 今日计划页提供批量/全量重评估入口,待收集反馈再做优化。 | | 回测实验对比 | ⏳ | 提供提示/温度多版本实验管理与曲线对比。 | | 实时指标面板 | ✅ | Streamlit 监控页已具备核心实时指标。 | | 异常日志钻取 | ⏳ | 待补充筛选、定位与历史对比能力。 | @@ -60,7 +60,7 @@ | --- | --- | --- | | 单元/集成测试覆盖 | ⏳ | 补充部门上下文、LLM 调用、回测指标等核心路径测试。 | | 决策流程回归集 | ⏳ | 建立提示模板调整后的回归用例,确保行为可复现。 | -| 数据质量验证 | ⏳ | 针对 ingest、特征写库建立自动化验证管线。 | +| 数据质量验证 | 🔄 | 自检页面新增候选/策略/持仓/新闻质量检查,后续补自动化管线。 | | 教程与示例 | ⏳ | 编写 Notebook 或 End-to-End 教程覆盖“数据→回测→调参→评估”。 | | 日志收集机制 | ⏳ | 完善日志聚合与查询工具,支撑问题定位。 |