From d2a056d7c09b4609db73f223fc6a5abd8c760eea Mon Sep 17 00:00:00 2001 From: sam Date: Fri, 17 Oct 2025 10:28:12 +0800 Subject: [PATCH] add monitor mode for reevaluation without database writes --- app/backtest/engine.py | 6 +- app/ui/views/today.py | 195 ++++++++++++++++++++++++++++++++++------- docs/TODO.md | 6 +- 3 files changed, 172 insertions(+), 35 deletions(-) diff --git a/app/backtest/engine.py b/app/backtest/engine.py index f04373c..079cdc5 100644 --- a/app/backtest/engine.py +++ b/app/backtest/engine.py @@ -93,9 +93,10 @@ class BacktestSession: class BacktestEngine: """Runs the multi-agent game inside a daily event-driven loop.""" - def __init__(self, cfg: BtConfig) -> None: + def __init__(self, cfg: BtConfig, *, persist_results: bool = True) -> None: self.cfg = cfg self.agents = default_agents() + self.persist_results = bool(persist_results) app_cfg = get_config() weight_config = app_cfg.agent_weights.as_dict() if app_cfg.agent_weights else {} if weight_config: @@ -572,6 +573,9 @@ class BacktestEngine: ) ) + if not self.persist_results: + return + try: with db_session() as conn: conn.executemany( diff --git a/app/ui/views/today.py b/app/ui/views/today.py index 7f46897..a439023 100644 --- a/app/ui/views/today.py +++ b/app/ui/views/today.py @@ -3,6 +3,7 @@ from __future__ import annotations import json from collections import Counter +from dataclasses import dataclass from datetime import date, datetime, timedelta from typing import Dict, List, Optional @@ -111,15 +112,115 @@ def _fetch_agent_actions(trade_date: str, symbols: List[str]) -> Dict[str, Dict[ return mapping +@dataclass +class ReevaluationOutcome: + changes: List[Dict[str, object]] + summary: List[Dict[str, object]] + monitor_only: bool = False + + +def _summarize_decisions( + records: List[tuple[str, object, object]], + engine: BacktestEngine, + expected_symbols: List[str], +) -> tuple[Dict[str, Dict[str, Optional[str]]], List[Dict[str, object]]]: + agent_names = [agent.name for agent in getattr(engine, "agents", [])] + after_map: Dict[str, Dict[str, Optional[str]]] = {} + summary_rows: List[Dict[str, object]] = [] + seen_codes: set[str] = set() + + for ts_code, _context, decision in records: + seen_codes.add(ts_code) + agent_actions: Dict[str, Optional[str]] = {} + final_action = getattr(decision, "action", None) + agent_actions["global"] = getattr(final_action, "value", final_action) + + department_map = getattr(decision, "department_decisions", {}) or {} + for dept_code, dept_decision in department_map.items(): + agent_actions[f"dept_{dept_code}"] = getattr(dept_decision.action, "value", dept_decision.action) + + utilities = getattr(decision, "utilities", {}) or {} + for agent_name in agent_names: + best_action: Optional[str] = None + best_score: Optional[float] = None + for action, agent_scores in utilities.items(): + if not isinstance(agent_scores, dict): + continue + score = agent_scores.get(agent_name) + if score is None: + continue + if best_score is None or score > best_score: + best_score = score + best_action = getattr(action, "value", action) + if best_action is not None: + agent_actions[agent_name] = best_action + + after_map[ts_code] = agent_actions + target_weight = getattr(decision, "target_weight", None) + summary_rows.append( + { + "代码": ts_code, + "最终动作": getattr(final_action, "value", final_action), + "信心": float(getattr(decision, "confidence", 0.0) or 0.0), + "目标权重": float(target_weight) if target_weight is not None else None, + "需复核": bool(getattr(decision, "requires_review", False)), + "备注": "", + } + ) + + expected_unique = list(dict.fromkeys(expected_symbols)) + for code in expected_unique: + if code not in seen_codes: + summary_rows.append( + { + "代码": code, + "最终动作": None, + "信心": None, + "目标权重": None, + "需复核": None, + "备注": "未返回决策(可能缺少数据或行情)", + } + ) + + return after_map, summary_rows + + +def _render_summary_table(rows: List[Dict[str, object]], *, title: str) -> None: + if not rows: + return + df = pd.DataFrame(rows) + if df.empty: + return + formatted = df.copy() + if "信心" in formatted.columns: + formatted["信心"] = formatted["信心"].apply( + lambda x: f"{x:.2f}" if x is not None else "-" + ) + if "目标权重" in formatted.columns: + formatted["目标权重"] = formatted["目标权重"].apply( + lambda x: f"{x:+.2%}" if x is not None else "-" + ) + if "需复核" in formatted.columns: + formatted["需复核"] = formatted["需复核"].apply( + lambda x: "是" if x is True else ("否" if x is False else "-") + ) + if "备注" in formatted.columns: + formatted["备注"] = formatted["备注"].apply(lambda x: x if x else "") + st.write(title) + st.dataframe(formatted, hide_index=True, width="stretch") + + def _reevaluate_symbols( trade_date_obj: date, symbols: List[str], cfg_id: str, cfg_name: str, -) -> List[Dict[str, object]]: + *, + monitor_only: bool = False, +) -> ReevaluationOutcome: unique_symbols = list(dict.fromkeys(symbols)) if not unique_symbols: - return [] + return ReevaluationOutcome(changes=[], summary=[], monitor_only=monitor_only) trade_date_str = trade_date_obj.isoformat() before_map = _fetch_agent_actions(trade_date_str, unique_symbols) engine_params: Dict[str, object] = {} @@ -155,17 +256,26 @@ def _reevaluate_symbols( universe=unique_symbols, params=engine_params, ) - engine = BacktestEngine(cfg) + engine = BacktestEngine(cfg, persist_results=not monitor_only) state = PortfolioState(cash=engine.initial_cash) - engine.simulate_day(trade_date_obj, state) - after_map = _fetch_agent_actions(trade_date_str, unique_symbols) + records = engine.simulate_day(trade_date_obj, state) + after_map, summary_rows = _summarize_decisions(records, engine, unique_symbols) + if monitor_only: + for row in summary_rows: + if not row.get("备注"): + row["备注"] = "监控模式预览" + symbol_order = list( + dict.fromkeys(unique_symbols + list(before_map.keys()) + list(after_map.keys())) + ) changes: List[Dict[str, object]] = [] - for code in unique_symbols: + for code in symbol_order: before_agents = before_map.get(code, {}) after_agents = after_map.get(code, {}) - for agent, new_action in after_agents.items(): + agents = set(before_agents.keys()) | set(after_agents.keys()) + for agent in sorted(agents): old_action = before_agents.get(agent) - if new_action != old_action: + new_action = after_agents.get(agent) + if old_action != new_action: changes.append( { "代码": code, @@ -174,17 +284,7 @@ def _reevaluate_symbols( "新动作": new_action, } ) - for agent, old_action in before_agents.items(): - if agent not in after_agents: - changes.append( - { - "代码": code, - "代理": agent, - "原动作": old_action, - "新动作": None, - } - ) - return changes + return ReevaluationOutcome(changes=changes, summary=summary_rows, monitor_only=monitor_only) def render_today_plan() -> None: @@ -281,25 +381,39 @@ def render_today_plan() -> None: metrics_cols[0].metric("已产出决策数", len(agent_symbols)) metrics_cols[1].metric("候选池标的", len(candidate_records)) metrics_cols[2].metric("当前持仓", len(position_codes)) - st.caption("对当前交易日所有标的触发策略重评估。") + monitor_only = st.checkbox( + "仅监控模式(不写入数据库)", + key="reeval_monitor_only", + ) + if monitor_only: + st.caption("监控模式:仅生成策略预览,不更新 agent_utils 或候选池记录。") + else: + st.caption("对当前交易日所有标的触发策略重评估。") if st.button("一键重评估全部", type="primary", width="stretch"): with st.spinner("正在对所有标的进行重评估,请稍候..."): try: trade_date_obj = _parse_trade_date(trade_date) progress = st.progress(0.0) progress.progress(0.3 if symbols else 1.0) - changes_all = _reevaluate_symbols( + outcome = _reevaluate_symbols( trade_date_obj, symbols, "reeval_ui_all", "UI All Re-eval", + monitor_only=monitor_only, ) progress.progress(1.0) st.success(f"一键重评估完成:共处理 {len(symbols)} 个标的") - if changes_all: + if outcome.changes: st.write("检测到以下动作变更:") - st.dataframe(pd.DataFrame(changes_all), hide_index=True, width='stretch') - st.rerun() + st.dataframe(pd.DataFrame(outcome.changes), hide_index=True, width='stretch') + if monitor_only: + _render_summary_table( + outcome.summary, + title="最新策略摘要(监控模式预览)", + ) + else: + st.rerun() except Exception as exc: # noqa: BLE001 LOGGER.exception("一键重评估失败", extra=LOG_EXTRA) st.error(f"一键重评估执行过程中发生错误:{exc}") @@ -800,18 +914,24 @@ def _render_today_plan_reevaluation_controls( if add_divider: st.divider() st.subheader("策略重评估") - st.caption("对当前选中的交易日与标的,立即触发一次策略评估并回写 agent_utils。") + monitor_only = bool(st.session_state.get("reeval_monitor_only", False)) + if monitor_only: + st.caption("监控模式已开启:生成策略预览但不会写入数据库。") + else: + st.caption("对当前选中的交易日与标的,立即触发一次策略评估并回写 agent_utils。") cols_re = st.columns([1, 1]) if cols_re[0].button("对该标的重评估", key=f"reevaluate_{ts_code}"): with st.spinner("正在重评估..."): try: trade_date_obj = _parse_trade_date(trade_date) - changes = _reevaluate_symbols( + outcome = _reevaluate_symbols( trade_date_obj, [ts_code], "reeval_ui", "UI Re-evaluation", + monitor_only=monitor_only, ) + changes = outcome.changes if changes: for change in changes: change.setdefault("代码", ts_code) @@ -822,7 +942,13 @@ def _render_today_plan_reevaluation_controls( st.dataframe(df_changes, hide_index=True, width='stretch') else: st.success("重评估完成,无动作变更。") - st.rerun() + if monitor_only: + _render_summary_table( + outcome.summary, + title="策略结果预览(监控模式)", + ) + else: + st.rerun() except Exception as exc: # noqa: BLE001 LOGGER.exception("重评估失败", extra=LOG_EXTRA) st.error(f"重评估失败:{exc}") @@ -837,17 +963,24 @@ def _render_today_plan_reevaluation_controls( trade_date_obj = _parse_trade_date(trade_date) progress = st.progress(0.0) progress.progress(0.3 if batch_symbols else 1.0) - changes_all = _reevaluate_symbols( + outcome = _reevaluate_symbols( trade_date_obj, batch_symbols, "reeval_ui_batch", "UI Batch Re-eval", + monitor_only=monitor_only, ) progress.progress(1.0) st.success(f"批量重评估完成:共处理 {len(batch_symbols)} 个标的") - if changes_all: - st.dataframe(pd.DataFrame(changes_all), hide_index=True, width='stretch') - st.rerun() + if outcome.changes: + st.dataframe(pd.DataFrame(outcome.changes), hide_index=True, width='stretch') + if monitor_only: + _render_summary_table( + outcome.summary, + title="批量策略结果预览(监控模式)", + ) + else: + st.rerun() except Exception as exc: # noqa: BLE001 LOGGER.exception("批量重评估失败", extra=LOG_EXTRA) st.error(f"批量重评估失败:{exc}") diff --git a/docs/TODO.md b/docs/TODO.md index 1cdec6e..f5620c3 100644 --- a/docs/TODO.md +++ b/docs/TODO.md @@ -39,10 +39,10 @@ | 工作项 | 状态 | 说明 | | --- | --- | --- | | 一键重评估入口 | ✅ | 今日计划页提供批量/全量重评估入口,待收集反馈再做优化。 | -| 回测实验对比 | 🔄 | 新增会话实验保存与曲线/指标对比,后续接入更多提示参数。 | +| 回测实验对比 | ✅ | 会话实验保存、曲线/指标对比与日期过滤均已落地,后续按需扩充提示参数。 | | 实时指标面板 | ✅ | Streamlit 监控页已具备核心实时指标。 | -| 异常日志钻取 | ⏳ | 待补充筛选、定位与历史对比能力。 | -| 仅监控模式 | ⏳ | 支持“监控不干预”场景的一键复评策略。 | +| 异常日志钻取 | ✅ | 日志视图支持时间/级别/阶段筛选、关键词搜索及双日期历史对比导出。 | +| 仅监控模式 | ✅ | 今日计划页新增监控模式一键重评估,预览策略而不写入数据库。 | | 风险预警面板 | ✅ | 风险预警能力已落地,仍需持续扩充指标。 | ## 风险控制与执行