diff --git a/app/ui/streamlit_app.py b/app/ui/streamlit_app.py index 489075d..7027c3a 100644 --- a/app/ui/streamlit_app.py +++ b/app/ui/streamlit_app.py @@ -415,13 +415,10 @@ def render_today_plan() -> None: LOGGER.exception("加载 agent_utils 失败", extra=LOG_EXTRA) st.warning("暂未写入部门/代理决策,请先运行回测或策略评估流程。") return - trade_dates = [row["trade_date"] for row in date_rows] if not trade_dates: st.info("暂无决策记录,完成一次回测后即可在此查看部门意见与投票结果。") return - - # ADD: read default selection from URL q = _get_query_params() default_trade_date = q.get("date", [trade_dates[0]])[0] try: @@ -429,7 +426,6 @@ def render_today_plan() -> None: except ValueError: default_idx = 0 trade_date = st.selectbox("交易日", trade_dates, index=default_idx) - with db_session(read_only=True) as conn: code_rows = conn.execute( """ @@ -441,22 +437,9 @@ def render_today_plan() -> None: (trade_date,), ).fetchall() symbols = [row["ts_code"] for row in code_rows] - if not symbols: - st.info("所选交易日暂无 agent_utils 记录。") - return - - # ADD: 投资助理模式开关(不指定具体标的) - assistant_mode = st.checkbox( - "投资助理(不指定标的)", - value=False, - help="开启后不显示具体标的,展示基于候选投资池的组合级建议与汇总信息。", - ) - - if assistant_mode: - ts_code = None - batch_symbols = [] + detail_tab, assistant_tab = st.tabs(["标的详情", "投资助理模式"]) + with assistant_tab: st.info("已开启投资助理模式:以下内容为组合级(去标的)建议,不包含任何具体标的代码。") - # 展示候选池聚合信息(不暴露具体代码) try: candidates = list_investment_pool(trade_date=trade_date) if candidates: @@ -525,684 +508,545 @@ def render_today_plan() -> None: except Exception: LOGGER.exception("加载候选池聚合信息失败", extra=LOG_EXTRA) st.error("加载候选池数据时发生错误。") - else: - default_ts = q.get("code", [symbols[0]])[0] - try: - default_ts_idx = symbols.index(default_ts) - except ValueError: - default_ts_idx = 0 - ts_code = st.selectbox("标的", symbols, index=default_ts_idx) - # ADD: batch selection for re-evaluation - batch_symbols = st.multiselect("批量重评估(可多选)", symbols, default=[]) - - # 一键重评估所有标的按钮 - if st.button("一键重评估所有标的", type="primary", width='stretch'): - with st.spinner("正在对所有标的进行重评估,请稍候..."): + with detail_tab: + if not symbols: + st.info("所选交易日暂无 agent_utils 记录。") + else: + default_ts = q.get("code", [symbols[0]])[0] try: - # 解析交易日 - trade_date_obj = None + default_ts_idx = symbols.index(default_ts) + except ValueError: + default_ts_idx = 0 + ts_code = st.selectbox("标的", symbols, index=default_ts_idx) + batch_symbols = st.multiselect("批量重评估(可多选)", symbols, default=[]) + + if st.button("一键重评估所有标的", type="primary", width='stretch'): + with st.spinner("正在对所有标的进行重评估,请稍候..."): + try: + # 解析交易日 + trade_date_obj = None + try: + trade_date_obj = date.fromisoformat(str(trade_date)) + except Exception: + try: + trade_date_obj = datetime.strptime(str(trade_date), "%Y%m%d").date() + except Exception: + pass + if trade_date_obj is None: + raise ValueError(f"无法解析交易日:{trade_date}") + + progress = st.progress(0.0) + changes_all = [] + success_count = 0 + error_count = 0 + + # 遍历所有标的 + for idx, code in enumerate(symbols, start=1): + try: + # 保存重评估前的状态 + with db_session(read_only=True) as conn: + before_rows = conn.execute( + "SELECT agent, action FROM agent_utils WHERE trade_date = ? AND ts_code = ?", + (trade_date, code), + ).fetchall() + before_map = {row["agent"]: row["action"] for row in before_rows} + + # 执行重评估 + cfg = BtConfig( + id="reeval_ui_all", + name="UI All Re-eval", + start_date=trade_date_obj, + end_date=trade_date_obj, + universe=[code], + params={}, + ) + engine = BacktestEngine(cfg) + state = PortfolioState() + _ = engine.simulate_day(trade_date_obj, state) + + # 检查变化 + with db_session(read_only=True) as conn: + after_rows = conn.execute( + "SELECT agent, action FROM agent_utils WHERE trade_date = ? AND ts_code = ?", + (trade_date, code), + ).fetchall() + for row in after_rows: + agent = row["agent"] + new_action = row["action"] + old_action = before_map.get(agent) + if new_action != old_action: + changes_all.append({"代码": code, "代理": agent, "原动作": old_action, "新动作": new_action}) + success_count += 1 + except Exception as e: + LOGGER.exception(f"重评估 {code} 失败", extra=LOG_EXTRA) + error_count += 1 + + # 更新进度 + progress.progress(idx / len(symbols)) + + # 显示结果 + if error_count > 0: + st.error(f"一键重评估完成:成功 {success_count} 个,失败 {error_count} 个") + else: + st.success(f"一键重评估完成:所有 {success_count} 个标的重评估成功") + + # 显示变更记录 + if changes_all: + st.write("检测到以下动作变更:") + st.dataframe(pd.DataFrame(changes_all), hide_index=True, width='stretch') + + # 刷新页面数据 + st.rerun() + except Exception as exc: + LOGGER.exception("一键重评估失败", extra=LOG_EXTRA) + st.error(f"一键重评估执行过程中发生错误:{exc}") + _set_query_params(date=str(trade_date), code=str(ts_code)) + with db_session(read_only=True) as conn: + rows = conn.execute( + """ + SELECT agent, action, utils, feasible, weight + FROM agent_utils + WHERE trade_date = ? AND ts_code = ? + ORDER BY CASE WHEN agent = 'global' THEN 1 ELSE 0 END, agent + """, + (trade_date, ts_code), + ).fetchall() + if not rows: + st.info("未查询到详细决策记录,稍后再试。") + return + try: + feasible_actions = json.loads(rows[0]["feasible"] or "[]") + except (KeyError, TypeError, json.JSONDecodeError): + feasible_actions = [] + global_info = None + dept_records: List[Dict[str, object]] = [] + dept_details: Dict[str, Dict[str, object]] = {} + agent_records: List[Dict[str, object]] = [] + for item in rows: + agent_name = item["agent"] + action = item["action"] + weight = float(item["weight"] or 0.0) try: - trade_date_obj = date.fromisoformat(str(trade_date)) + utils = json.loads(item["utils"] or "{}") + except json.JSONDecodeError: + utils = {} + + if agent_name == "global": + global_info = { + "action": action, + "confidence": float(utils.get("_confidence", 0.0)), + "target_weight": float(utils.get("_target_weight", 0.0)), + "department_votes": utils.get("_department_votes", {}), + "requires_review": bool(utils.get("_requires_review", False)), + "scope_values": utils.get("_scope_values", {}), + "close_series": utils.get("_close_series", []), + "turnover_series": utils.get("_turnover_series", []), + "department_supplements": utils.get("_department_supplements", {}), + "department_dialogue": utils.get("_department_dialogue", {}), + "department_telemetry": utils.get("_department_telemetry", {}), + } + continue + + if agent_name.startswith("dept_"): + code = agent_name.split("dept_", 1)[-1] + signals = utils.get("_signals", []) + risks = utils.get("_risks", []) + supplements = utils.get("_supplements", []) + dialogue = utils.get("_dialogue", []) + telemetry = utils.get("_telemetry", {}) + dept_records.append( + { + "部门": code, + "行动": action, + "信心": float(utils.get("_confidence", 0.0)), + "权重": weight, + "摘要": utils.get("_summary", ""), + "核心信号": ";".join(signals) if isinstance(signals, list) else signals, + "风险提示": ";".join(risks) if isinstance(risks, list) else risks, + "补充次数": len(supplements) if isinstance(supplements, list) else 0, + } + ) + dept_details[code] = { + "supplements": supplements if isinstance(supplements, list) else [], + "dialogue": dialogue if isinstance(dialogue, list) else [], + "summary": utils.get("_summary", ""), + "signals": signals, + "risks": risks, + "telemetry": telemetry if isinstance(telemetry, dict) else {}, + } + else: + score_map = { + key: float(val) + for key, val in utils.items() + if not str(key).startswith("_") + } + agent_records.append( + { + "代理": agent_name, + "建议动作": action, + "权重": weight, + "SELL": score_map.get("SELL", 0.0), + "HOLD": score_map.get("HOLD", 0.0), + "BUY_S": score_map.get("BUY_S", 0.0), + "BUY_M": score_map.get("BUY_M", 0.0), + "BUY_L": score_map.get("BUY_L", 0.0), + } + ) + if feasible_actions: + st.caption(f"可行操作集合:{', '.join(feasible_actions)}") + st.subheader("全局策略") + if global_info: + col1, col2, col3 = st.columns(3) + col1.metric("最终行动", global_info["action"]) + col2.metric("信心", f"{global_info['confidence']:.2f}") + col3.metric("目标权重", f"{global_info['target_weight']:+.2%}") + if global_info["department_votes"]: + st.json(global_info["department_votes"]) + if global_info["requires_review"]: + st.warning("部门分歧较大,已标记为需人工复核。") + with st.expander("基础上下文数据", expanded=False): + # ADD: export buttons + scope = global_info.get("scope_values") or {} + close_series = global_info.get("close_series") or [] + turnover_series = global_info.get("turnover_series") or [] + st.write("最新字段:") + if scope: + st.json(scope) + st.download_button( + "下载字段(JSON)", + data=json.dumps(scope, ensure_ascii=False, indent=2), + file_name=f"{ts_code}_{trade_date}_scope.json", + mime="application/json", + key="dl_scope_json", + ) + if close_series: + st.write("收盘价时间序列 (最近窗口):") + st.json(close_series) + try: + import io, csv + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow(["trade_date", "close"]) + for dt, val in close_series: + writer.writerow([dt, val]) + st.download_button( + "下载收盘价(CSV)", + data=buf.getvalue(), + file_name=f"{ts_code}_{trade_date}_close_series.csv", + mime="text/csv", + key="dl_close_csv", + ) + except Exception: + pass + if turnover_series: + st.write("换手率时间序列 (最近窗口):") + st.json(turnover_series) + dept_sup = global_info.get("department_supplements") or {} + dept_dialogue = global_info.get("department_dialogue") or {} + dept_telemetry = global_info.get("department_telemetry") or {} + if dept_sup or dept_dialogue: + with st.expander("部门补数与对话记录", expanded=False): + if dept_sup: + st.write("补充数据:") + st.json(dept_sup) + if dept_dialogue: + st.write("对话片段:") + st.json(dept_dialogue) + if dept_telemetry: + with st.expander("部门 LLM 元数据", expanded=False): + st.json(dept_telemetry) + else: + st.info("暂未写入全局策略摘要。") + st.subheader("部门意见") + if dept_records: + # ADD: keyword filter for department summaries + keyword = st.text_input("筛选摘要/信号关键词", value="") + filtered = dept_records + if keyword.strip(): + kw = keyword.strip() + filtered = [ + item for item in dept_records + if kw in str(item.get("摘要", "")) or kw in str(item.get("核心信号", "")) + ] + # ADD: confidence filter and sort + min_conf = st.slider("最低信心过滤", 0.0, 1.0, 0.0, 0.05) + sort_col = st.selectbox("排序列", ["信心", "权重"], index=0) + filtered = [row for row in filtered if float(row.get("信心", 0.0)) >= min_conf] + filtered = sorted(filtered, key=lambda r: float(r.get(sort_col, 0.0)), reverse=True) + dept_df = pd.DataFrame(filtered) + st.dataframe(dept_df, width='stretch', hide_index=True) + try: + st.download_button( + "下载部门意见(CSV)", + data=dept_df.to_csv(index=False), + file_name=f"{trade_date}_{ts_code}_departments.csv", + mime="text/csv", + key="dl_dept_csv", + ) except Exception: + pass + for code, details in dept_details.items(): + with st.expander(f"{code} 补充详情", expanded=False): + supplements = details.get("supplements", []) + dialogue = details.get("dialogue", []) + if supplements: + st.write("补充数据:") + st.json(supplements) + else: + st.caption("无补充数据请求。") + if dialogue: + st.write("对话记录:") + for idx, line in enumerate(dialogue, start=1): + st.markdown(f"**回合 {idx}:** {line}") + else: + st.caption("无额外对话。") + telemetry = details.get("telemetry") or {} + if telemetry: + st.write("LLM 元数据:") + st.json(telemetry) + else: + st.info("暂无部门记录。") + st.subheader("代理评分") + if agent_records: + # ADD: sorting and CSV export for agents + sort_agent_by = st.selectbox( + "代理排序", + ["权重", "SELL", "HOLD", "BUY_S", "BUY_M", "BUY_L"], + index=1, + ) + agent_df = pd.DataFrame(agent_records) + if sort_agent_by in agent_df.columns: + agent_df = agent_df.sort_values(sort_agent_by, ascending=False) + st.dataframe(agent_df, width='stretch', hide_index=True) + try: + st.download_button( + "下载代理评分(CSV)", + data=agent_df.to_csv(index=False), + file_name=f"{trade_date}_{ts_code}_agents.csv", + mime="text/csv", + key="dl_agent_csv", + ) + except Exception: + pass + else: + st.info("暂无基础代理评分。") + st.divider() + st.subheader("相关新闻") + try: + with db_session(read_only=True) as conn: + # 解析当前trade_date为datetime对象 try: - trade_date_obj = datetime.strptime(str(trade_date), "%Y%m%d").date() - except Exception: - pass - if trade_date_obj is None: - raise ValueError(f"无法解析交易日:{trade_date}") + trade_date_obj = date.fromisoformat(str(trade_date)) + except: + try: + trade_date_obj = datetime.strptime(str(trade_date), "%Y%m%d").date() + except: + # 如果解析失败,使用当前日期向前推7天 + trade_date_obj = date.today() - timedelta(days=7) + + # 查询近7天内与当前标的相关的新闻,按发布时间降序排列 + news_query = """ + SELECT id, title, source, pub_time, sentiment, heat, entities + FROM news + WHERE ts_code = ? AND pub_time >= ? + ORDER BY pub_time DESC + LIMIT 10 + """ + # 计算7天前的日期字符串 + seven_days_ago = (trade_date_obj - timedelta(days=7)).strftime("%Y-%m-%d") + news_rows = conn.execute(news_query, (ts_code, seven_days_ago)).fetchall() + + if news_rows: + news_data = [] + for row in news_rows: + # 解析entities字段获取更多信息 + entities_info = {} + try: + if row["entities"]: + entities_info = json.loads(row["entities"]) + except (json.JSONDecodeError, TypeError): + pass - progress = st.progress(0.0) - changes_all = [] - success_count = 0 - error_count = 0 + # 准备新闻数据 + news_item = { + "标题": row["title"], + "来源": row["source"], + "发布时间": row["pub_time"], + "情感指数": f"{row['sentiment']:.2f}" if row["sentiment"] is not None else "-", + "热度评分": f"{row['heat']:.2f}" if row["heat"] is not None else "-" + } - # 遍历所有标的 - for idx, code in enumerate(symbols, start=1): + # 如果有行业信息,添加到展示数据中 + industries = entities_info.get("industries", []) + if industries: + news_item["相关行业"] = "、".join(industries[:3]) # 只显示前3个行业 + + news_data.append(news_item) + + # 显示新闻表格 + news_df = pd.DataFrame(news_data) + # 确保所有列都是字符串类型,避免PyArrow序列化错误 + for col in news_df.columns: + news_df[col] = news_df[col].astype(str) + st.dataframe(news_df, width='stretch', hide_index=True) + + # 添加新闻详情展开视图 + st.write("详细新闻内容:") + for idx, row in enumerate(news_rows): + with st.expander(f"{idx+1}. {row['title']}", expanded=False): + st.write(f"**来源:** {row['source']}") + st.write(f"**发布时间:** {row['pub_time']}") + + # 解析entities获取更多详细信息 + entities_info = {} + try: + if row["entities"]: + entities_info = json.loads(row["entities"]) + except (json.JSONDecodeError, TypeError): + pass + + # 显示情感和热度信息 + sentiment_display = f"{row['sentiment']:.2f}" if row["sentiment"] is not None else "-" + heat_display = f"{row['heat']:.2f}" if row["heat"] is not None else "-" + st.write(f"**情感指数:** {sentiment_display} | **热度评分:** {heat_display}") + + # 显示行业信息 + industries = entities_info.get("industries", []) + if industries: + st.write(f"**相关行业:** {'、'.join(industries)}") + + # 显示重要关键词 + important_keywords = entities_info.get("important_keywords", []) + if important_keywords: + st.write(f"**重要关键词:** {'、'.join(important_keywords)}") + + # 显示URL链接(如果有) + url = entities_info.get("source_url", "") + if url: + st.markdown(f"[查看原文]({url})", unsafe_allow_html=True) + else: + st.info(f"近7天内暂无关于 {ts_code} 的新闻。") + except Exception as e: + LOGGER.exception("获取新闻数据失败", extra=LOG_EXTRA) + st.error(f"获取新闻数据时发生错误:{e}") + st.divider() + st.info("投资池与仓位概览已移至单独页面。请在侧边或页面导航中选择“投资池/仓位”以查看详细信息。") + st.divider() + st.subheader("策略重评估") + st.caption("对当前选中的交易日与标的,立即触发一次策略评估并回写 agent_utils。") + cols_re = st.columns([1,1]) + if cols_re[0].button("对该标的重评估", key="reevaluate_current_symbol"): + with st.spinner("正在重评估..."): try: - # 保存重评估前的状态 + trade_date_obj = None + try: + trade_date_obj = date.fromisoformat(str(trade_date)) + except Exception: + try: + trade_date_obj = datetime.strptime(str(trade_date), "%Y%m%d").date() + except Exception: + pass + if trade_date_obj is None: + raise ValueError(f"无法解析交易日:{trade_date}") + # snapshot before with db_session(read_only=True) as conn: before_rows = conn.execute( - "SELECT agent, action FROM agent_utils WHERE trade_date = ? AND ts_code = ?", - (trade_date, code), + """ + SELECT agent, action, utils FROM agent_utils + WHERE trade_date = ? AND ts_code = ? + """, + (trade_date, ts_code), ).fetchall() - before_map = {row["agent"]: row["action"] for row in before_rows} - - # 执行重评估 + before_map = {row["agent"]: (row["action"], row["utils"]) for row in before_rows} cfg = BtConfig( - id="reeval_ui_all", - name="UI All Re-eval", + id="reeval_ui", + name="UI Re-evaluation", start_date=trade_date_obj, end_date=trade_date_obj, - universe=[code], + universe=[ts_code], params={}, ) engine = BacktestEngine(cfg) state = PortfolioState() _ = engine.simulate_day(trade_date_obj, state) - - # 检查变化 + # compare after with db_session(read_only=True) as conn: after_rows = conn.execute( - "SELECT agent, action FROM agent_utils WHERE trade_date = ? AND ts_code = ?", - (trade_date, code), + """ + SELECT agent, action, utils FROM agent_utils + WHERE trade_date = ? AND ts_code = ? + """, + (trade_date, ts_code), ).fetchall() + changes = [] for row in after_rows: agent = row["agent"] new_action = row["action"] - old_action = before_map.get(agent) + old_action, _old_utils = before_map.get(agent, (None, None)) if new_action != old_action: - changes_all.append({"代码": code, "代理": agent, "原动作": old_action, "新动作": new_action}) - success_count += 1 - except Exception as e: - LOGGER.exception(f"重评估 {code} 失败", extra=LOG_EXTRA) - error_count += 1 - - # 更新进度 - progress.progress(idx / len(symbols)) - - # 显示结果 - if error_count > 0: - st.error(f"一键重评估完成:成功 {success_count} 个,失败 {error_count} 个") - else: - st.success(f"一键重评估完成:所有 {success_count} 个标的重评估成功") - - # 显示变更记录 - if changes_all: - st.write("检测到以下动作变更:") - st.dataframe(pd.DataFrame(changes_all), hide_index=True, width='stretch') - - # 刷新页面数据 - st.rerun() - except Exception as exc: - LOGGER.exception("一键重评估失败", extra=LOG_EXTRA) - st.error(f"一键重评估执行过程中发生错误:{exc}") - - # sync URL params - _set_query_params(date=str(trade_date), code=str(ts_code)) - - with db_session(read_only=True) as conn: - rows = conn.execute( - """ - SELECT agent, action, utils, feasible, weight - FROM agent_utils - WHERE trade_date = ? AND ts_code = ? - ORDER BY CASE WHEN agent = 'global' THEN 1 ELSE 0 END, agent - """, - (trade_date, ts_code), - ).fetchall() - - if not rows: - st.info("未查询到详细决策记录,稍后再试。") - return - - try: - feasible_actions = json.loads(rows[0]["feasible"] or "[]") - except (KeyError, TypeError, json.JSONDecodeError): - feasible_actions = [] - - global_info = None - dept_records: List[Dict[str, object]] = [] - dept_details: Dict[str, Dict[str, object]] = {} - agent_records: List[Dict[str, object]] = [] - - for item in rows: - agent_name = item["agent"] - action = item["action"] - weight = float(item["weight"] or 0.0) - try: - utils = json.loads(item["utils"] or "{}") - except json.JSONDecodeError: - utils = {} - - if agent_name == "global": - global_info = { - "action": action, - "confidence": float(utils.get("_confidence", 0.0)), - "target_weight": float(utils.get("_target_weight", 0.0)), - "department_votes": utils.get("_department_votes", {}), - "requires_review": bool(utils.get("_requires_review", False)), - "scope_values": utils.get("_scope_values", {}), - "close_series": utils.get("_close_series", []), - "turnover_series": utils.get("_turnover_series", []), - "department_supplements": utils.get("_department_supplements", {}), - "department_dialogue": utils.get("_department_dialogue", {}), - "department_telemetry": utils.get("_department_telemetry", {}), - } - continue - - if agent_name.startswith("dept_"): - code = agent_name.split("dept_", 1)[-1] - signals = utils.get("_signals", []) - risks = utils.get("_risks", []) - supplements = utils.get("_supplements", []) - dialogue = utils.get("_dialogue", []) - telemetry = utils.get("_telemetry", {}) - dept_records.append( - { - "部门": code, - "行动": action, - "信心": float(utils.get("_confidence", 0.0)), - "权重": weight, - "摘要": utils.get("_summary", ""), - "核心信号": ";".join(signals) if isinstance(signals, list) else signals, - "风险提示": ";".join(risks) if isinstance(risks, list) else risks, - "补充次数": len(supplements) if isinstance(supplements, list) else 0, - } - ) - dept_details[code] = { - "supplements": supplements if isinstance(supplements, list) else [], - "dialogue": dialogue if isinstance(dialogue, list) else [], - "summary": utils.get("_summary", ""), - "signals": signals, - "risks": risks, - "telemetry": telemetry if isinstance(telemetry, dict) else {}, - } - else: - score_map = { - key: float(val) - for key, val in utils.items() - if not str(key).startswith("_") - } - agent_records.append( - { - "代理": agent_name, - "建议动作": action, - "权重": weight, - "SELL": score_map.get("SELL", 0.0), - "HOLD": score_map.get("HOLD", 0.0), - "BUY_S": score_map.get("BUY_S", 0.0), - "BUY_M": score_map.get("BUY_M", 0.0), - "BUY_L": score_map.get("BUY_L", 0.0), - } - ) - - if feasible_actions: - st.caption(f"可行操作集合:{', '.join(feasible_actions)}") - - st.subheader("全局策略") - if global_info: - col1, col2, col3 = st.columns(3) - col1.metric("最终行动", global_info["action"]) - col2.metric("信心", f"{global_info['confidence']:.2f}") - col3.metric("目标权重", f"{global_info['target_weight']:+.2%}") - if global_info["department_votes"]: - st.json(global_info["department_votes"]) - if global_info["requires_review"]: - st.warning("部门分歧较大,已标记为需人工复核。") - with st.expander("基础上下文数据", expanded=False): - # ADD: export buttons - scope = global_info.get("scope_values") or {} - close_series = global_info.get("close_series") or [] - turnover_series = global_info.get("turnover_series") or [] - st.write("最新字段:") - if scope: - st.json(scope) - st.download_button( - "下载字段(JSON)", - data=json.dumps(scope, ensure_ascii=False, indent=2), - file_name=f"{ts_code}_{trade_date}_scope.json", - mime="application/json", - key="dl_scope_json", - ) - if close_series: - st.write("收盘价时间序列 (最近窗口):") - st.json(close_series) - try: - import io, csv - buf = io.StringIO() - writer = csv.writer(buf) - writer.writerow(["trade_date", "close"]) - for dt, val in close_series: - writer.writerow([dt, val]) - st.download_button( - "下载收盘价(CSV)", - data=buf.getvalue(), - file_name=f"{ts_code}_{trade_date}_close_series.csv", - mime="text/csv", - key="dl_close_csv", - ) - except Exception: - pass - if turnover_series: - st.write("换手率时间序列 (最近窗口):") - st.json(turnover_series) - dept_sup = global_info.get("department_supplements") or {} - dept_dialogue = global_info.get("department_dialogue") or {} - dept_telemetry = global_info.get("department_telemetry") or {} - if dept_sup or dept_dialogue: - with st.expander("部门补数与对话记录", expanded=False): - if dept_sup: - st.write("补充数据:") - st.json(dept_sup) - if dept_dialogue: - st.write("对话片段:") - st.json(dept_dialogue) - if dept_telemetry: - with st.expander("部门 LLM 元数据", expanded=False): - st.json(dept_telemetry) - else: - st.info("暂未写入全局策略摘要。") - - st.subheader("部门意见") - if dept_records: - # ADD: keyword filter for department summaries - keyword = st.text_input("筛选摘要/信号关键词", value="") - filtered = dept_records - if keyword.strip(): - kw = keyword.strip() - filtered = [ - item for item in dept_records - if kw in str(item.get("摘要", "")) or kw in str(item.get("核心信号", "")) - ] - # ADD: confidence filter and sort - min_conf = st.slider("最低信心过滤", 0.0, 1.0, 0.0, 0.05) - sort_col = st.selectbox("排序列", ["信心", "权重"], index=0) - filtered = [row for row in filtered if float(row.get("信心", 0.0)) >= min_conf] - filtered = sorted(filtered, key=lambda r: float(r.get(sort_col, 0.0)), reverse=True) - dept_df = pd.DataFrame(filtered) - st.dataframe(dept_df, width='stretch', hide_index=True) - try: - st.download_button( - "下载部门意见(CSV)", - data=dept_df.to_csv(index=False), - file_name=f"{trade_date}_{ts_code}_departments.csv", - mime="text/csv", - key="dl_dept_csv", - ) - except Exception: - pass - for code, details in dept_details.items(): - with st.expander(f"{code} 补充详情", expanded=False): - supplements = details.get("supplements", []) - dialogue = details.get("dialogue", []) - if supplements: - st.write("补充数据:") - st.json(supplements) - else: - st.caption("无补充数据请求。") - if dialogue: - st.write("对话记录:") - for idx, line in enumerate(dialogue, start=1): - st.markdown(f"**回合 {idx}:** {line}") - else: - st.caption("无额外对话。") - telemetry = details.get("telemetry") or {} - if telemetry: - st.write("LLM 元数据:") - st.json(telemetry) - else: - st.info("暂无部门记录。") - - st.subheader("代理评分") - if agent_records: - # ADD: sorting and CSV export for agents - sort_agent_by = st.selectbox( - "代理排序", - ["权重", "SELL", "HOLD", "BUY_S", "BUY_M", "BUY_L"], - index=1, - ) - agent_df = pd.DataFrame(agent_records) - if sort_agent_by in agent_df.columns: - agent_df = agent_df.sort_values(sort_agent_by, ascending=False) - st.dataframe(agent_df, width='stretch', hide_index=True) - try: - st.download_button( - "下载代理评分(CSV)", - data=agent_df.to_csv(index=False), - file_name=f"{trade_date}_{ts_code}_agents.csv", - mime="text/csv", - key="dl_agent_csv", - ) - except Exception: - pass - else: - st.info("暂无基础代理评分。") - - # 添加相关新闻展示部分 - st.divider() - st.subheader("相关新闻") - # 获取与当前标的相关的最新新闻 - try: - with db_session(read_only=True) as conn: - # 解析当前trade_date为datetime对象 - try: - trade_date_obj = date.fromisoformat(str(trade_date)) - except: - try: - trade_date_obj = datetime.strptime(str(trade_date), "%Y%m%d").date() - except: - # 如果解析失败,使用当前日期向前推7天 - trade_date_obj = date.today() - timedelta(days=7) - - # 查询近7天内与当前标的相关的新闻,按发布时间降序排列 - news_query = """ - SELECT id, title, source, pub_time, sentiment, heat, entities - FROM news - WHERE ts_code = ? AND pub_time >= ? - ORDER BY pub_time DESC - LIMIT 10 - """ - # 计算7天前的日期字符串 - seven_days_ago = (trade_date_obj - timedelta(days=7)).strftime("%Y-%m-%d") - news_rows = conn.execute(news_query, (ts_code, seven_days_ago)).fetchall() - - if news_rows: - news_data = [] - for row in news_rows: - # 解析entities字段获取更多信息 - entities_info = {} - try: - if row["entities"]: - entities_info = json.loads(row["entities"]) - except (json.JSONDecodeError, TypeError): - pass - - # 准备新闻数据 - news_item = { - "标题": row["title"], - "来源": row["source"], - "发布时间": row["pub_time"], - "情感指数": f"{row['sentiment']:.2f}" if row["sentiment"] is not None else "-", - "热度评分": f"{row['heat']:.2f}" if row["heat"] is not None else "-" - } - - # 如果有行业信息,添加到展示数据中 - industries = entities_info.get("industries", []) - if industries: - news_item["相关行业"] = "、".join(industries[:3]) # 只显示前3个行业 - - news_data.append(news_item) - - # 显示新闻表格 - news_df = pd.DataFrame(news_data) - # 确保所有列都是字符串类型,避免PyArrow序列化错误 - for col in news_df.columns: - news_df[col] = news_df[col].astype(str) - st.dataframe(news_df, width='stretch', hide_index=True) - - # 添加新闻详情展开视图 - st.write("详细新闻内容:") - for idx, row in enumerate(news_rows): - with st.expander(f"{idx+1}. {row['title']}", expanded=False): - st.write(f"**来源:** {row['source']}") - st.write(f"**发布时间:** {row['pub_time']}") - - # 解析entities获取更多详细信息 - entities_info = {} + changes.append({"代理": agent, "原动作": old_action, "新动作": new_action}) + if changes: + st.success("重评估完成,检测到动作变更:") + st.dataframe(pd.DataFrame(changes), hide_index=True, width='stretch') + else: + st.success("重评估完成,无动作变更。") + st.rerun() + except Exception as exc: # noqa: BLE001 + LOGGER.exception("重评估失败", extra=LOG_EXTRA) + st.error(f"重评估失败:{exc}") + if cols_re[1].button("批量重评估(所选)", key="reevaluate_batch", disabled=not batch_symbols): + with st.spinner("批量重评估执行中..."): try: - if row["entities"]: - entities_info = json.loads(row["entities"]) - except (json.JSONDecodeError, TypeError): - pass - - # 显示情感和热度信息 - sentiment_display = f"{row['sentiment']:.2f}" if row["sentiment"] is not None else "-" - heat_display = f"{row['heat']:.2f}" if row["heat"] is not None else "-" - st.write(f"**情感指数:** {sentiment_display} | **热度评分:** {heat_display}") - - # 显示行业信息 - industries = entities_info.get("industries", []) - if industries: - st.write(f"**相关行业:** {'、'.join(industries)}") - - # 显示重要关键词 - important_keywords = entities_info.get("important_keywords", []) - if important_keywords: - st.write(f"**重要关键词:** {'、'.join(important_keywords)}") - - # 显示URL链接(如果有) - url = entities_info.get("source_url", "") - if url: - st.markdown(f"[查看原文]({url})", unsafe_allow_html=True) - else: - st.info(f"近7天内暂无关于 {ts_code} 的新闻。") - except Exception as e: - LOGGER.exception("获取新闻数据失败", extra=LOG_EXTRA) - st.error(f"获取新闻数据时发生错误:{e}") + trade_date_obj = None + try: + trade_date_obj = date.fromisoformat(str(trade_date)) + except Exception: + try: + trade_date_obj = datetime.strptime(str(trade_date), "%Y%m%d").date() + except Exception: + pass + if trade_date_obj is None: + raise ValueError(f"无法解析交易日:{trade_date}") + progress = st.progress(0.0) + changes_all: List[Dict[str, object]] = [] + for idx, code in enumerate(batch_symbols, start=1): + with db_session(read_only=True) as conn: + before_rows = conn.execute( + "SELECT agent, action FROM agent_utils WHERE trade_date = ? AND ts_code = ?", + (trade_date, code), + ).fetchall() + before_map = {row["agent"]: row["action"] for row in before_rows} + cfg = BtConfig( + id="reeval_ui_batch", + name="UI Batch Re-eval", + start_date=trade_date_obj, + end_date=trade_date_obj, + universe=[code], + params={}, + ) + engine = BacktestEngine(cfg) + state = PortfolioState() + _ = engine.simulate_day(trade_date_obj, state) + with db_session(read_only=True) as conn: + after_rows = conn.execute( + "SELECT agent, action FROM agent_utils WHERE trade_date = ? AND ts_code = ?", + (trade_date, code), + ).fetchall() + for row in after_rows: + agent = row["agent"] + new_action = row["action"] + old_action = before_map.get(agent) + if new_action != old_action: + changes_all.append({"代码": code, "代理": agent, "原动作": old_action, "新动作": new_action}) + progress.progress(idx / max(1, len(batch_symbols))) + st.success("批量重评估完成。") + if changes_all: + st.dataframe(pd.DataFrame(changes_all), hide_index=True, width='stretch') + st.rerun() + except Exception as exc: # noqa: BLE001 + LOGGER.exception("批量重评估失败", extra=LOG_EXTRA) + st.error(f"批量重评估失败:{exc}") - st.divider() - # 提示用户跳转到单独的“投资池与仓位概览”页 - st.info("投资池与仓位概览已移至单独页面。请在侧边或页面导航中选择“投资池/仓位”以查看详细信息。") - - st.divider() - st.subheader("策略重评估") - st.caption("对当前选中的交易日与标的,立即触发一次策略评估并回写 agent_utils。") - cols_re = st.columns([1,1]) - if cols_re[0].button("对该标的重评估", key="reevaluate_current_symbol"): - with st.spinner("正在重评估..."): - try: - trade_date_obj = None - try: - trade_date_obj = date.fromisoformat(str(trade_date)) - except Exception: - try: - trade_date_obj = datetime.strptime(str(trade_date), "%Y%m%d").date() - except Exception: - pass - if trade_date_obj is None: - raise ValueError(f"无法解析交易日:{trade_date}") - # snapshot before - with db_session(read_only=True) as conn: - before_rows = conn.execute( - """ - SELECT agent, action, utils FROM agent_utils - WHERE trade_date = ? AND ts_code = ? - """, - (trade_date, ts_code), - ).fetchall() - before_map = {row["agent"]: (row["action"], row["utils"]) for row in before_rows} - cfg = BtConfig( - id="reeval_ui", - name="UI Re-evaluation", - start_date=trade_date_obj, - end_date=trade_date_obj, - universe=[ts_code], - params={}, - ) - engine = BacktestEngine(cfg) - state = PortfolioState() - _ = engine.simulate_day(trade_date_obj, state) - # compare after - with db_session(read_only=True) as conn: - after_rows = conn.execute( - """ - SELECT agent, action, utils FROM agent_utils - WHERE trade_date = ? AND ts_code = ? - """, - (trade_date, ts_code), - ).fetchall() - changes = [] - for row in after_rows: - agent = row["agent"] - new_action = row["action"] - old_action, _old_utils = before_map.get(agent, (None, None)) - if new_action != old_action: - changes.append({"代理": agent, "原动作": old_action, "新动作": new_action}) - if changes: - st.success("重评估完成,检测到动作变更:") - st.dataframe(pd.DataFrame(changes), hide_index=True, width='stretch') - else: - st.success("重评估完成,无动作变更。") - st.rerun() - except Exception as exc: # noqa: BLE001 - LOGGER.exception("重评估失败", extra=LOG_EXTRA) - st.error(f"重评估失败:{exc}") - if cols_re[1].button("批量重评估(所选)", key="reevaluate_batch", disabled=not batch_symbols): - with st.spinner("批量重评估执行中..."): - try: - trade_date_obj = None - try: - trade_date_obj = date.fromisoformat(str(trade_date)) - except Exception: - try: - trade_date_obj = datetime.strptime(str(trade_date), "%Y%m%d").date() - except Exception: - pass - if trade_date_obj is None: - raise ValueError(f"无法解析交易日:{trade_date}") - progress = st.progress(0.0) - changes_all: List[Dict[str, object]] = [] - for idx, code in enumerate(batch_symbols, start=1): - with db_session(read_only=True) as conn: - before_rows = conn.execute( - "SELECT agent, action FROM agent_utils WHERE trade_date = ? AND ts_code = ?", - (trade_date, code), - ).fetchall() - before_map = {row["agent"]: row["action"] for row in before_rows} - cfg = BtConfig( - id="reeval_ui_batch", - name="UI Batch Re-eval", - start_date=trade_date_obj, - end_date=trade_date_obj, - universe=[code], - params={}, - ) - engine = BacktestEngine(cfg) - state = PortfolioState() - _ = engine.simulate_day(trade_date_obj, state) - with db_session(read_only=True) as conn: - after_rows = conn.execute( - "SELECT agent, action FROM agent_utils WHERE trade_date = ? AND ts_code = ?", - (trade_date, code), - ).fetchall() - for row in after_rows: - agent = row["agent"] - new_action = row["action"] - old_action = before_map.get(agent) - if new_action != old_action: - changes_all.append({"代码": code, "代理": agent, "原动作": old_action, "新动作": new_action}) - progress.progress(idx / max(1, len(batch_symbols))) - st.success("批量重评估完成。") - if changes_all: - st.dataframe(pd.DataFrame(changes_all), hide_index=True, width='stretch') - st.rerun() - except Exception as exc: # noqa: BLE001 - LOGGER.exception("批量重评估失败", extra=LOG_EXTRA) - st.error(f"批量重评估失败:{exc}") - - -def render_log_viewer() -> None: - """渲染日志钻取与历史对比视图页面。""" - LOGGER.info("渲染日志视图页面", extra=LOG_EXTRA) - st.header("日志钻取与历史对比") - st.write("查看系统运行日志,支持时间范围筛选、关键词搜索和历史对比功能。") - - # 日志时间范围选择 - col1, col2 = st.columns(2) - with col1: - start_date = st.date_input("开始日期", value=date.today() - timedelta(days=7)) - with col2: - end_date = st.date_input("结束日期", value=date.today()) - - # 日志级别筛选 - log_levels = ["ALL", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] - selected_level = st.selectbox("日志级别", log_levels, index=1) - - # 关键词搜索 - search_query = st.text_input("搜索关键词") - - # 阶段筛选 - with db_session(read_only=True) as conn: - stages = [row["stage"] for row in conn.execute("SELECT DISTINCT stage FROM run_log").fetchall()] - stages = [s for s in stages if s] # 过滤空值 - stages.insert(0, "ALL") - selected_stage = st.selectbox("执行阶段", stages) - - # 查询日志 - with st.spinner("加载日志数据中..."): - try: - with db_session(read_only=True) as conn: - query_parts = ["SELECT ts, stage, level, msg FROM run_log WHERE 1=1"] - params = [] - - # 添加日期过滤 - start_ts = f"{start_date.isoformat()}T00:00:00Z" - end_ts = f"{end_date.isoformat()}T23:59:59Z" - query_parts.append("AND ts BETWEEN ? AND ?") - params.extend([start_ts, end_ts]) - - # 添加级别过滤 - if selected_level != "ALL": - query_parts.append("AND level = ?") - params.append(selected_level) - - # 添加关键词过滤 - if search_query: - query_parts.append("AND msg LIKE ?") - params.append(f"%{search_query}%") - - # 添加阶段过滤 - if selected_stage != "ALL": - query_parts.append("AND stage = ?") - params.append(selected_stage) - - # 添加排序 - query_parts.append("ORDER BY ts DESC") - - # 执行查询 - query = " ".join(query_parts) - rows = conn.execute(query, params).fetchall() - - # 转换为DataFrame - if rows: - # 将sqlite3.Row对象转换为字典列表 - rows_dict = [{key: row[key] for key in row.keys()} for row in rows] - log_df = pd.DataFrame(rows_dict) - # 格式化时间戳并确保数据类型一致 - log_df["ts"] = pd.to_datetime(log_df["ts"]).dt.strftime("%Y-%m-%d %H:%M:%S") - # 确保所有列都是字符串类型,避免PyArrow序列化错误 - for col in log_df.columns: - log_df[col] = log_df[col].astype(str) - else: - log_df = pd.DataFrame(columns=["ts", "stage", "level", "msg"]) - - # 显示日志表格 - st.dataframe( - log_df, - hide_index=True, - width="stretch", - column_config={ - "ts": st.column_config.TextColumn("时间"), - "stage": st.column_config.TextColumn("执行阶段"), - "level": st.column_config.TextColumn("日志级别"), - "msg": st.column_config.TextColumn("日志消息", width="large") - } - ) - - # 下载功能 - if not log_df.empty: - csv_data = log_df.to_csv(index=False).encode('utf-8') - st.download_button( - label="下载日志CSV", - data=csv_data, - file_name=f"logs_{start_date}_{end_date}.csv", - mime="text/csv", - key="download_logs" - ) - - # JSON下载 - json_data = log_df.to_json(orient='records', force_ascii=False, indent=2) - st.download_button( - label="下载日志JSON", - data=json_data, - file_name=f"logs_{start_date}_{end_date}.json", - mime="application/json", - key="download_logs_json" - ) - except Exception as e: - LOGGER.exception("加载日志失败", extra=LOG_EXTRA) - st.error(f"加载日志数据失败:{e}") - - # 历史对比功能 - st.subheader("历史对比") - st.write("选择两个时间点的日志进行对比分析。") - - # 第一个时间点选择 - col3, col4 = st.columns(2) - with col3: - compare_date1 = st.date_input("对比日期1", value=date.today() - timedelta(days=1)) - with col4: - compare_date2 = st.date_input("对比日期2", value=date.today()) def render_pool_overview() -> None: @@ -2872,6 +2716,161 @@ def render_tests() -> None: st.write(response) +def render_log_viewer() -> None: + """渲染日志钻取与历史对比视图页面。""" + LOGGER.info("渲染日志视图页面", extra=LOG_EXTRA) + st.header("日志钻取与历史对比") + st.write("查看系统运行日志,支持时间范围筛选、关键词搜索和历史对比功能。") + + col1, col2 = st.columns(2) + with col1: + start_date = st.date_input("开始日期", value=date.today() - timedelta(days=7)) + with col2: + end_date = st.date_input("结束日期", value=date.today()) + + log_levels = ["ALL", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + selected_level = st.selectbox("日志级别", log_levels, index=1) + + search_query = st.text_input("搜索关键词") + + with db_session(read_only=True) as conn: + stages = [row["stage"] for row in conn.execute("SELECT DISTINCT stage FROM run_log").fetchall()] + stages = [s for s in stages if s] + stages.insert(0, "ALL") + selected_stage = st.selectbox("执行阶段", stages) + + with st.spinner("加载日志数据中..."): + try: + with db_session(read_only=True) as conn: + query_parts = ["SELECT ts, stage, level, msg FROM run_log WHERE 1=1"] + params: list[object] = [] + + start_ts = f"{start_date.isoformat()}T00:00:00Z" + end_ts = f"{end_date.isoformat()}T23:59:59Z" + query_parts.append("AND ts BETWEEN ? AND ?") + params.extend([start_ts, end_ts]) + + if selected_level != "ALL": + query_parts.append("AND level = ?") + params.append(selected_level) + + if search_query: + query_parts.append("AND msg LIKE ?") + params.append(f"%{search_query}%") + + if selected_stage != "ALL": + query_parts.append("AND stage = ?") + params.append(selected_stage) + + query_parts.append("ORDER BY ts DESC") + + query = " ".join(query_parts) + rows = conn.execute(query, params).fetchall() + + if rows: + rows_dict = [{key: row[key] for key in row.keys()} for row in rows] + log_df = pd.DataFrame(rows_dict) + log_df["ts"] = pd.to_datetime(log_df["ts"]).dt.strftime("%Y-%m-%d %H:%M:%S") + for col in log_df.columns: + log_df[col] = log_df[col].astype(str) + else: + log_df = pd.DataFrame(columns=["ts", "stage", "level", "msg"]) + + st.dataframe( + log_df, + hide_index=True, + width="stretch", + column_config={ + "ts": st.column_config.TextColumn("时间"), + "stage": st.column_config.TextColumn("执行阶段"), + "level": st.column_config.TextColumn("日志级别"), + "msg": st.column_config.TextColumn("日志消息", width="large"), + }, + ) + + if not log_df.empty: + csv_data = log_df.to_csv(index=False).encode("utf-8") + st.download_button( + label="下载日志CSV", + data=csv_data, + file_name=f"logs_{start_date}_{end_date}.csv", + mime="text/csv", + key="download_logs", + ) + + json_data = log_df.to_json(orient="records", force_ascii=False, indent=2) + st.download_button( + label="下载日志JSON", + data=json_data, + file_name=f"logs_{start_date}_{end_date}.json", + mime="application/json", + key="download_logs_json", + ) + except Exception as exc: # noqa: BLE001 + LOGGER.exception("加载日志失败", extra=LOG_EXTRA) + st.error(f"加载日志数据失败:{exc}") + + st.subheader("历史对比") + st.write("选择两个时间点的日志进行对比分析。") + + col3, col4 = st.columns(2) + with col3: + compare_date1 = st.date_input("对比日期1", value=date.today() - timedelta(days=1)) + with col4: + compare_date2 = st.date_input("对比日期2", value=date.today()) + + comparison_stage = st.selectbox("对比阶段", stages, key="compare_stage") + st.write("选择需要比较的日志数量。") + compare_limit = st.slider("对比日志数量", min_value=10, max_value=200, value=50, step=10) + + if st.button("生成历史对比报告"): + with st.spinner("生成对比报告中..."): + try: + with db_session(read_only=True) as conn: + def load_logs(d: date) -> pd.DataFrame: + start_ts = f"{d.isoformat()}T00:00:00Z" + end_ts = f"{d.isoformat()}T23:59:59Z" + query = ["SELECT ts, level, msg FROM run_log WHERE ts BETWEEN ? AND ?"] + params: list[object] = [start_ts, end_ts] + if comparison_stage != "ALL": + query.append("AND stage = ?") + params.append(comparison_stage) + query.append("ORDER BY ts DESC LIMIT ?") + params.append(compare_limit) + sql = " ".join(query) + rows = conn.execute(sql, params).fetchall() + if not rows: + return pd.DataFrame(columns=["ts", "level", "msg"]) + df = pd.DataFrame([{k: row[k] for k in row.keys()} for row in rows]) + df["ts"] = pd.to_datetime(df["ts"]).dt.strftime("%Y-%m-%d %H:%M:%S") + return df + + df1 = load_logs(compare_date1) + df2 = load_logs(compare_date2) + + if df1.empty and df2.empty: + st.info("选定日期暂无日志可对比。") + else: + st.write("### 对比结果") + col_a, col_b = st.columns(2) + with col_a: + st.write(f"{compare_date1} 日日志") + st.dataframe(df1, hide_index=True, width="stretch") + with col_b: + st.write(f"{compare_date2} 日日志") + st.dataframe(df2, hide_index=True, width="stretch") + + summary = { + "日期1日志条数": int(len(df1)), + "日期2日志条数": int(len(df2)), + "新增日志条数": max(len(df2) - len(df1), 0), + } + st.write("摘要:", summary) + except Exception as exc: # noqa: BLE001 + LOGGER.exception("历史对比生成失败", extra=LOG_EXTRA) + st.error(f"生成历史对比失败:{exc}") + + def render_data_settings() -> None: """渲染数据源配置界面.""" st.subheader("Tushare 数据源")