From 1a99b72c60d08f51344f84cc2aff3ea172d0ae6e Mon Sep 17 00:00:00 2001 From: sam Date: Sun, 28 Sep 2025 09:39:48 +0800 Subject: [PATCH] update --- README.md | 30 ++++- app/agents/base.py | 6 +- app/agents/departments.py | 167 +++++++++++++++++++++++ app/agents/game.py | 120 ++++++++++++++++- app/backtest/engine.py | 107 ++++++++++++++- app/llm/client.py | 82 +++++++++++- app/llm/prompts.py | 41 +++++- app/ui/streamlit_app.py | 270 +++++++++++++++++++++++++++++++++++++- app/utils/config.py | 93 ++++++++++++- requirements.txt | 1 + 10 files changed, 884 insertions(+), 33 deletions(-) create mode 100644 app/agents/departments.py diff --git a/README.md b/README.md index 082d751..f986fec 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ - **可视化与解释**:`app/ui/streamlit_app.py` 提供四大页签(今日计划、回测与复盘、数据与设置、自检测试),结合 Plotly 图形展示和 `app/llm` 提示卡片生成器,支撑人机协作分析。 - **统一日志与持久化**:SQLite 统一存储行情、回测与日志,配合 `DatabaseLogHandler` 在 UI/抓数流程中输出结构化运行轨迹,支持快速追踪与复盘。 - **跨市场数据扩展**:`app/ingest/tushare.py` 追加指数、ETF/公募基金、期货、外汇、港股与美股的增量拉取逻辑,确保多资产因子与宏观代理所需的行情基础数据齐备。 +- **部门化多模型协作**:`app/agents/departments.py` 封装部门级 LLM 调度,`app/llm/client.py` 支持 single/majority/leader 策略,部门结论在 `app/agents/game.py` 与六类基础代理共同博弈,并持久化至 `agent_utils` 供 UI 展示。 ## LLM + 多智能体最佳实践 @@ -85,10 +86,35 @@ Streamlit `自检测试` 页签提供: ## 下一步 1. 在 `app/features` 和 `app/backtest` 中完善信号计算、事件驱动撮合与绩效指标输出。 -2. 将代理效用写入 SQLite 的 `agent_utils` 和 `alloc_log` 表,驱动 UI 决策解释。 +2. 丰富 `DepartmentContext`(行情快照、风险指标),让部门评估拥有更完整的上下文。 3. 使用轻量情感分析与热度计算填充 `news`、`heat_daily` 与热点指数。 -4. 接入本地小模型或 API 完成人类可读的策略建议卡片,形成端到端体验。 +4. 在 Streamlit 今日计划页增加“重新评估”与日志追踪能力,串联实时调度链路。 ## License 本项目采用定制的 “LLM Quant Framework License v1.0”。个人使用、修改与分发需保留出处,任何商业用途须事先与版权方协商并签署付费协议。详情参见仓库根目录的 `LICENSE` 文件。 + +## 多智能体 LLM 投资流程 + +- **部门化结构**:动量、价值、新闻、流动性、宏观、风险等代理视作独立业务部门,利用项目现有的数据/特征处理流程向每个部门提供上下文。 +- **多 LLM 协作**:每个部门内部可配置多家 LLM 提供商(如 DeepSeek、OpenAI、文心等)作为智能体助手,分别生成分析意见和风险提示;可通过多数投票、仲裁等策略确定部门结论。 +- **部门输出**:统一返回部门行动(买入/卖出/持有)、信心水平以及核心理由 (context + LLM 摘要),当前实现会将摘要、风险提示与票权写入 `agent_utils`。 +- **跨部门协调**:沿用 `app/agents/game.py` 的纳什谈判/投票结构,将各部门的结论与六类基础代理共同建模,必要时触发冲突检测并标记复核。 +- **日志与可视化**:Streamlit 今日计划页读取 `agent_utils` 展示部门意见、投票细节与全局行动,可快速核查部门分歧与置信度。 + +## 实施步骤 + +1. **配置扩展** (`app/utils/config.py` + `config.json`) ✅ + - 部门支持 primary/ensemble、策略(single/majority/leader)、权重,并可在 Streamlit 中编辑主要字段。 + +2. **部门管控器** ✅ + - `app/agents/departments.py` 提供 `DepartmentAgent`/`DepartmentManager`,封装 Prompt 构建、多模型协商及异常回退。 + +3. **集成决策链** ✅ + - `app/agents/game.py` 将部门评分嵌入纳什谈判/加权投票,并对冲突设置复核标记;`app/backtest/engine.py` 将结果落库。 + +4. **UI 与日志**(进行中) + - 今日计划页展示部门意见、票权与全局策略,后续补充一键重评估、日志钻取。 + +5. **测试与验证**(待补充) + - 需完善部门上下文构造与多模型调用的单元/集成测试,结合回测指标对比多 LLM 策略收益差异。 diff --git a/app/agents/base.py b/app/agents/base.py index d78d6eb..2f913b7 100644 --- a/app/agents/base.py +++ b/app/agents/base.py @@ -1,9 +1,9 @@ """Agent abstractions for the multi-agent decision engine.""" from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from enum import Enum -from typing import Dict, Mapping +from typing import Any, Dict, Mapping class AgentAction(str, Enum): @@ -19,6 +19,8 @@ class AgentContext: ts_code: str trade_date: str features: Mapping[str, float] + market_snapshot: Mapping[str, Any] = field(default_factory=dict) + raw: Mapping[str, Any] = field(default_factory=dict) class Agent: diff --git a/app/agents/departments.py b/app/agents/departments.py new file mode 100644 index 0000000..7cd7b35 --- /dev/null +++ b/app/agents/departments.py @@ -0,0 +1,167 @@ +"""Department-level LLM agents coordinating multi-model decisions.""" +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from typing import Any, Dict, List, Mapping + +from app.agents.base import AgentAction +from app.llm.client import run_llm_with_config +from app.llm.prompts import department_prompt +from app.utils.config import DepartmentSettings +from app.utils.logging import get_logger + +LOGGER = get_logger(__name__) +LOG_EXTRA = {"stage": "department"} + + +@dataclass +class DepartmentContext: + """Structured data fed into a department for decision making.""" + + ts_code: str + trade_date: str + features: Mapping[str, Any] = field(default_factory=dict) + market_snapshot: Mapping[str, Any] = field(default_factory=dict) + raw: Mapping[str, Any] = field(default_factory=dict) + + +@dataclass +class DepartmentDecision: + """Result produced by a department agent.""" + + department: str + action: AgentAction + confidence: float + summary: str + raw_response: str + signals: List[str] = field(default_factory=list) + risks: List[str] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + return { + "department": self.department, + "action": self.action.value, + "confidence": self.confidence, + "summary": self.summary, + "signals": self.signals, + "risks": self.risks, + "raw_response": self.raw_response, + } + + +class DepartmentAgent: + """Wraps LLM ensemble logic for a single analytical department.""" + + def __init__(self, settings: DepartmentSettings) -> None: + self.settings = settings + + def analyze(self, context: DepartmentContext) -> DepartmentDecision: + prompt = department_prompt(self.settings, context) + system_prompt = ( + "你是一个多智能体量化投研系统中的分部决策者,需要根据提供的结构化信息给出买卖意见。" + ) + try: + response = run_llm_with_config(self.settings.llm, prompt, system=system_prompt) + except Exception as exc: # noqa: BLE001 + LOGGER.exception("部门 %s 调用 LLM 失败:%s", self.settings.code, exc, extra=LOG_EXTRA) + return DepartmentDecision( + department=self.settings.code, + action=AgentAction.HOLD, + confidence=0.0, + summary=f"LLM 调用失败:{exc}", + raw_response=str(exc), + ) + + decision_data = _parse_department_response(response) + action = _normalize_action(decision_data.get("action")) + confidence = _clamp_float(decision_data.get("confidence"), default=0.5) + summary = decision_data.get("summary") or decision_data.get("reason") or "" + signals = decision_data.get("signals") or decision_data.get("rationale") or [] + if isinstance(signals, str): + signals = [signals] + risks = decision_data.get("risks") or decision_data.get("warnings") or [] + if isinstance(risks, str): + risks = [risks] + + decision = DepartmentDecision( + department=self.settings.code, + action=action, + confidence=confidence, + summary=summary or "未提供摘要", + signals=[str(sig) for sig in signals if sig], + risks=[str(risk) for risk in risks if risk], + raw_response=response, + ) + LOGGER.debug( + "部门 %s 决策:action=%s confidence=%.2f", + self.settings.code, + decision.action.value, + decision.confidence, + extra=LOG_EXTRA, + ) + return decision + + +class DepartmentManager: + """Orchestrates all departments defined in configuration.""" + + def __init__(self, departments: Mapping[str, DepartmentSettings]) -> None: + self.agents: Dict[str, DepartmentAgent] = { + code: DepartmentAgent(settings) + for code, settings in departments.items() + } + + def evaluate(self, context: DepartmentContext) -> Dict[str, DepartmentDecision]: + results: Dict[str, DepartmentDecision] = {} + for code, agent in self.agents.items(): + results[code] = agent.analyze(context) + return results + + +def _parse_department_response(text: str) -> Dict[str, Any]: + """Extract a JSON object from the LLM response if possible.""" + + cleaned = text.strip() + candidate = None + if cleaned.startswith("{") and cleaned.endswith("}"): + candidate = cleaned + else: + start = cleaned.find("{") + end = cleaned.rfind("}") + if start != -1 and end != -1 and end > start: + candidate = cleaned[start : end + 1] + if candidate: + try: + return json.loads(candidate) + except json.JSONDecodeError: + LOGGER.debug("部门响应 JSON 解析失败,返回原始文本", extra=LOG_EXTRA) + return {"summary": cleaned} + + +def _normalize_action(value: Any) -> AgentAction: + if isinstance(value, str): + upper = value.strip().upper() + mapping = { + "BUY": AgentAction.BUY_M, + "BUY_S": AgentAction.BUY_S, + "BUY_M": AgentAction.BUY_M, + "BUY_L": AgentAction.BUY_L, + "SELL": AgentAction.SELL, + "HOLD": AgentAction.HOLD, + } + if upper in mapping: + return mapping[upper] + if "SELL" in upper: + return AgentAction.SELL + if "BUY" in upper: + return AgentAction.BUY_M + return AgentAction.HOLD + + +def _clamp_float(value: Any, default: float = 0.5) -> float: + try: + num = float(value) + except (TypeError, ValueError): + return default + return max(0.0, min(1.0, num)) diff --git a/app/agents/game.py b/app/agents/game.py index 31b6659..d8b8869 100644 --- a/app/agents/game.py +++ b/app/agents/game.py @@ -1,11 +1,12 @@ """Multi-agent decision game implementation.""" from __future__ import annotations -from dataclasses import dataclass -from math import exp, log -from typing import Dict, Iterable, List, Mapping, Tuple +from dataclasses import dataclass, field +from math import log +from typing import Dict, Iterable, List, Mapping, Optional, Tuple from .base import Agent, AgentAction, AgentContext, UtilityMatrix +from .departments import DepartmentContext, DepartmentDecision, DepartmentManager from .registry import weight_map @@ -29,6 +30,9 @@ class Decision: target_weight: float feasible_actions: List[AgentAction] utilities: UtilityMatrix + department_decisions: Dict[str, DepartmentDecision] = field(default_factory=dict) + department_votes: Dict[str, float] = field(default_factory=dict) + requires_review: bool = False def compute_utilities(agents: Iterable[Agent], context: AgentContext) -> UtilityMatrix: @@ -105,16 +109,56 @@ def target_weight_for_action(action: AgentAction) -> float: return mapping[action] -def decide(context: AgentContext, agents: Iterable[Agent], weights: Mapping[str, float], method: str = "nash") -> Decision: +def decide( + context: AgentContext, + agents: Iterable[Agent], + weights: Mapping[str, float], + method: str = "nash", + department_manager: Optional[DepartmentManager] = None, + department_context: Optional[DepartmentContext] = None, +) -> Decision: agent_list = list(agents) - norm_weights = weight_map(dict(weights)) utilities = compute_utilities(agent_list, context) feas_actions = feasible_actions(agent_list, context) if not feas_actions: - return Decision(AgentAction.HOLD, 0.0, 0.0, [], utilities) + return Decision( + action=AgentAction.HOLD, + confidence=0.0, + target_weight=0.0, + feasible_actions=[], + utilities=utilities, + ) + + raw_weights = dict(weights) + department_decisions: Dict[str, DepartmentDecision] = {} + department_votes: Dict[str, float] = {} + + if department_manager: + dept_context = department_context + if dept_context is None: + dept_context = DepartmentContext( + ts_code=context.ts_code, + trade_date=context.trade_date, + features=dict(context.features), + market_snapshot=dict(getattr(context, "market_snapshot", {}) or {}), + raw=dict(getattr(context, "raw", {}) or {}), + ) + department_decisions = department_manager.evaluate(dept_context) + for code, decision in department_decisions.items(): + agent_key = f"dept_{code}" + dept_agent = department_manager.agents.get(code) + weight = dept_agent.settings.weight if dept_agent else 1.0 + raw_weights[agent_key] = weight + scores = _department_scores(decision) + for action in ACTIONS: + utilities.setdefault(action, {})[agent_key] = scores[action] + bucket = _department_vote_bucket(decision.action) + if bucket: + department_votes[bucket] = department_votes.get(bucket, 0.0) + weight * decision.confidence filtered_utilities = {action: utilities[action] for action in feas_actions} hold_scores = utilities.get(AgentAction.HOLD, {}) + norm_weights = weight_map(raw_weights) if method == "vote": action, confidence = vote(filtered_utilities, norm_weights) @@ -124,4 +168,66 @@ def decide(context: AgentContext, agents: Iterable[Agent], weights: Mapping[str, action, confidence = vote(filtered_utilities, norm_weights) weight = target_weight_for_action(action) - return Decision(action, confidence, weight, feas_actions, utilities) + requires_review = _department_conflict_flag(department_votes) + return Decision( + action=action, + confidence=confidence, + target_weight=weight, + feasible_actions=feas_actions, + utilities=utilities, + department_decisions=department_decisions, + department_votes=department_votes, + requires_review=requires_review, + ) + + +def _department_scores(decision: DepartmentDecision) -> Dict[AgentAction, float]: + conf = _clamp(decision.confidence) + scores: Dict[AgentAction, float] = {action: 0.2 for action in ACTIONS} + if decision.action is AgentAction.SELL: + scores[AgentAction.SELL] = 0.7 + 0.3 * conf + scores[AgentAction.HOLD] = 0.4 * (1 - conf) + scores[AgentAction.BUY_S] = 0.2 * (1 - conf) + scores[AgentAction.BUY_M] = 0.15 * (1 - conf) + scores[AgentAction.BUY_L] = 0.1 * (1 - conf) + elif decision.action in {AgentAction.BUY_S, AgentAction.BUY_M, AgentAction.BUY_L}: + for action in (AgentAction.BUY_S, AgentAction.BUY_M, AgentAction.BUY_L): + if action is decision.action: + scores[action] = 0.6 + 0.4 * conf + else: + scores[action] = 0.3 + 0.3 * conf + scores[AgentAction.HOLD] = 0.3 * (1 - conf) + 0.25 + scores[AgentAction.SELL] = 0.15 * (1 - conf) + else: # HOLD 或未知 + scores[AgentAction.HOLD] = 0.6 + 0.4 * conf + scores[AgentAction.SELL] = 0.3 * (1 - conf) + scores[AgentAction.BUY_S] = 0.3 * (1 - conf) + scores[AgentAction.BUY_M] = 0.3 * (1 - conf) + scores[AgentAction.BUY_L] = 0.3 * (1 - conf) + return {action: _clamp(score) for action, score in scores.items()} + + +def _department_vote_bucket(action: AgentAction) -> str: + if action is AgentAction.SELL: + return "sell" + if action in {AgentAction.BUY_S, AgentAction.BUY_M, AgentAction.BUY_L}: + return "buy" + if action is AgentAction.HOLD: + return "hold" + return "" + + +def _department_conflict_flag(votes: Mapping[str, float]) -> bool: + if not votes: + return False + total = sum(votes.values()) + if total <= 0: + return True + top = max(votes.values()) + if top < total * 0.45: + return True + if len(votes) > 1: + sorted_votes = sorted(votes.values(), reverse=True) + if len(sorted_votes) >= 2 and (sorted_votes[0] - sorted_votes[1]) < total * 0.1: + return True + return False diff --git a/app/backtest/engine.py b/app/backtest/engine.py index 3876ea2..eff7b72 100644 --- a/app/backtest/engine.py +++ b/app/backtest/engine.py @@ -1,14 +1,22 @@ """Backtest engine skeleton for daily bar simulation.""" from __future__ import annotations +import json from dataclasses import dataclass, field from datetime import date from typing import Dict, Iterable, List, Mapping from app.agents.base import AgentContext +from app.agents.departments import DepartmentManager from app.agents.game import Decision, decide from app.agents.registry import default_agents +from app.utils.config import get_config from app.utils.db import db_session +from app.utils.logging import get_logger + + +LOGGER = get_logger(__name__) +LOG_EXTRA = {"stage": "backtest"} @dataclass @@ -40,7 +48,15 @@ class BacktestEngine: def __init__(self, cfg: BtConfig) -> None: self.cfg = cfg self.agents = default_agents() - self.weights = {agent.name: 1.0 for agent in self.agents} + app_cfg = get_config() + weight_config = app_cfg.agent_weights.as_dict() if app_cfg.agent_weights else {} + if weight_config: + self.weights = weight_config + else: + self.weights = {agent.name: 1.0 for agent in self.agents} + self.department_manager = ( + DepartmentManager(app_cfg.departments) if app_cfg.departments else None + ) def load_market_data(self, trade_date: date) -> Mapping[str, Dict[str, float]]: """Load per-stock feature vectors. Replace with real data access.""" @@ -53,7 +69,13 @@ class BacktestEngine: decisions: List[Decision] = [] for ts_code, features in feature_map.items(): context = AgentContext(ts_code=ts_code, trade_date=trade_date.isoformat(), features=features) - decision = decide(context, self.agents, self.weights, method=self.cfg.method) + decision = decide( + context, + self.agents, + self.weights, + method=self.cfg.method, + department_manager=self.department_manager, + ) decisions.append(decision) self.record_agent_state(context, decision) # TODO: translate decisions into fills, holdings, and NAV updates. @@ -66,9 +88,88 @@ class BacktestEngine: "ts_code": context.ts_code, "action": decision.action.value, "confidence": decision.confidence, + "department_votes": decision.department_votes, + "requires_review": decision.requires_review, + "departments": { + code: dept.to_dict() + for code, dept in decision.department_decisions.items() + }, } + combined_weights = dict(self.weights) + if self.department_manager: + for code, agent in self.department_manager.agents.items(): + key = f"dept_{code}" + combined_weights[key] = agent.settings.weight + + feasible_json = json.dumps( + [action.value for action in decision.feasible_actions], + ensure_ascii=False, + ) + rows = [] + for agent_name, weight in combined_weights.items(): + action_scores = { + action.value: float(decision.utilities.get(action, {}).get(agent_name, 0.0)) + for action in decision.utilities.keys() + } + best_action = decision.action.value + if action_scores: + best_action = max(action_scores.items(), key=lambda item: item[1])[0] + metadata: Dict[str, object] = {} + if agent_name.startswith("dept_"): + dept_code = agent_name.split("dept_", 1)[-1] + dept_decision = decision.department_decisions.get(dept_code) + if dept_decision: + metadata = { + "_summary": dept_decision.summary, + "_signals": dept_decision.signals, + "_risks": dept_decision.risks, + "_confidence": dept_decision.confidence, + } + payload_json = {**action_scores, **metadata} + rows.append( + ( + context.trade_date, + context.ts_code, + agent_name, + best_action, + json.dumps(payload_json, ensure_ascii=False), + feasible_json, + float(weight), + ) + ) + + global_payload = { + "_confidence": decision.confidence, + "_target_weight": decision.target_weight, + "_department_votes": decision.department_votes, + "_requires_review": decision.requires_review, + } + rows.append( + ( + context.trade_date, + context.ts_code, + "global", + decision.action.value, + json.dumps(global_payload, ensure_ascii=False), + feasible_json, + 1.0, + ) + ) + + try: + with db_session() as conn: + conn.executemany( + """ + INSERT OR REPLACE INTO agent_utils + (trade_date, ts_code, agent, action, utils, feasible, weight) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + rows, + ) + except Exception: + LOGGER.exception("写入 agent_utils 失败", extra=LOG_EXTRA) _ = payload - # Implementation should persist into agent_utils and bt_trades. + # TODO: persist payload into bt_trades / audit tables when schema is ready. def run(self) -> BacktestResult: state = PortfolioState() diff --git a/app/llm/client.py b/app/llm/client.py index 130e4f1..1e6a88b 100644 --- a/app/llm/client.py +++ b/app/llm/client.py @@ -8,7 +8,13 @@ from typing import Dict, Iterable, List, Optional import requests -from app.utils.config import DEFAULT_LLM_BASE_URLS, DEFAULT_LLM_MODELS, LLMEndpoint, get_config +from app.utils.config import ( + DEFAULT_LLM_BASE_URLS, + DEFAULT_LLM_MODELS, + LLMConfig, + LLMEndpoint, + get_config, +) from app.utils.logging import get_logger LOGGER = get_logger(__name__) @@ -140,15 +146,13 @@ def _normalize_response(text: str) -> str: def run_llm(prompt: str, *, system: Optional[str] = None) -> str: - """Execute the configured LLM strategy with the given prompt.""" + """Execute the globally configured LLM strategy with the given prompt.""" settings = get_config().llm - if settings.strategy == "majority": - return _run_majority_vote(settings, prompt, system) - return _call_endpoint(settings.primary, prompt, system) + return run_llm_with_config(settings, prompt, system=system) -def _run_majority_vote(config, prompt: str, system: Optional[str]) -> str: +def _run_majority_vote(config: LLMConfig, prompt: str, system: Optional[str]) -> str: endpoints: List[LLMEndpoint] = [config.primary] + list(config.ensemble) responses: List[Dict[str, str]] = [] failures: List[str] = [] @@ -199,6 +203,72 @@ def _run_majority_vote(config, prompt: str, system: Optional[str]) -> str: return responses[0]["raw"] +def _run_leader_follow(config: LLMConfig, prompt: str, system: Optional[str]) -> str: + advisors: List[Dict[str, str]] = [] + for endpoint in config.ensemble: + try: + raw = _call_endpoint(endpoint, prompt, system) + advisors.append( + { + "provider": endpoint.provider, + "model": endpoint.model or "", + "raw": raw, + } + ) + except Exception as exc: # noqa: BLE001 + LOGGER.warning( + "顾问模型调用失败:%s:%s -> %s", + endpoint.provider, + endpoint.model, + exc, + extra=LOG_EXTRA, + ) + + if not advisors: + LOGGER.info("领导者策略顾问为空,回退至主模型", extra=LOG_EXTRA) + return _call_endpoint(config.primary, prompt, system) + + advisor_chunks = [] + for idx, record in enumerate(advisors, start=1): + snippet = record["raw"].strip() + if len(snippet) > 1200: + snippet = snippet[:1200] + "..." + advisor_chunks.append( + f"顾问#{idx} ({record['provider']}:{record['model']}):\n{snippet}" + ) + advisor_section = "\n\n".join(advisor_chunks) + leader_prompt = ( + "【顾问模型意见】\n" + f"{advisor_section}\n\n" + "请在充分参考顾问模型观点的基础上,保持原始指令的输出格式进行最终回答。\n\n" + f"{prompt}" + ) + LOGGER.info( + "领导者策略触发:顾问数量=%s", + len(advisors), + extra=LOG_EXTRA, + ) + return _call_endpoint(config.primary, leader_prompt, system) + + +def run_llm_with_config( + config: LLMConfig, + prompt: str, + *, + system: Optional[str] = None, +) -> str: + """Execute an LLM request using the provided configuration block.""" + + strategy = (config.strategy or "single").lower() + if strategy == "leader-follower": + strategy = "leader" + if strategy == "majority": + return _run_majority_vote(config, prompt, system) + if strategy == "leader": + return _run_leader_follow(config, prompt, system) + return _call_endpoint(config.primary, prompt, system) + + def llm_config_snapshot() -> Dict[str, object]: """Return a sanitized snapshot of current LLM configuration for debugging.""" diff --git a/app/llm/prompts.py b/app/llm/prompts.py index 96cd99f..53d980c 100644 --- a/app/llm/prompts.py +++ b/app/llm/prompts.py @@ -1,7 +1,11 @@ """Prompt templates for natural language outputs.""" from __future__ import annotations -from typing import Dict +from typing import Dict, TYPE_CHECKING + +if TYPE_CHECKING: # pragma: no cover + from app.utils.config import DepartmentSettings + from app.agents.departments import DepartmentContext def plan_prompt(data: Dict) -> str: @@ -9,3 +13,38 @@ def plan_prompt(data: Dict) -> str: _ = data return "你是一个投资助理,请根据提供的数据给出三条要点和两条风险提示。" + + +def department_prompt(settings: "DepartmentSettings", context: "DepartmentContext") -> str: + """Compose a structured prompt for department-level LLM ensemble.""" + + feature_lines = "\n".join( + f"- {key}: {value}" for key, value in sorted(context.features.items()) + ) + market_lines = "\n".join( + f"- {key}: {value}" for key, value in sorted(context.market_snapshot.items()) + ) + + instructions = f""" +部门名称:{settings.title} +股票代码:{context.ts_code} +交易日:{context.trade_date} + +【核心特征】 +{feature_lines or '- (无)'} + +【市场背景】 +{market_lines or '- (无)'} + +请基于以上数据给出该部门对当前股票的操作建议。输出必须是 JSON,字段如下: +{{ + "action": "BUY|BUY_S|BUY_M|BUY_L|SELL|HOLD", + "confidence": 0-1 之间的小数,表示信心, + "summary": "一句话概括理由", + "signals": ["详细要点", "..."], + "risks": ["风险点", "..."] +}} + +请严格返回单个 JSON 对象,不要添加额外文本。 +""" + return instructions.strip() diff --git a/app/ui/streamlit_app.py b/app/ui/streamlit_app.py index e232bac..330b622 100644 --- a/app/ui/streamlit_app.py +++ b/app/ui/streamlit_app.py @@ -5,12 +5,14 @@ import sys from dataclasses import asdict from datetime import date, timedelta from pathlib import Path -from typing import List +from typing import Dict, List ROOT = Path(__file__).resolve().parents[2] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) +import json + import pandas as pd import plotly.express as px import plotly.graph_objects as go @@ -21,11 +23,12 @@ from app.data.schema import initialize_database from app.ingest.checker import run_boot_check from app.ingest.tushare import FetchJob, run_ingestion from app.llm.client import llm_config_snapshot, run_llm -from app.llm.explain import make_human_card from app.utils.config import ( + ALLOWED_LLM_STRATEGIES, DEFAULT_LLM_BASE_URLS, DEFAULT_LLM_MODEL_OPTIONS, DEFAULT_LLM_MODELS, + DepartmentSettings, LLMEndpoint, get_config, save_config, @@ -102,10 +105,153 @@ def _load_daily_frame(ts_code: str, start: date, end: date) -> pd.DataFrame: def render_today_plan() -> None: LOGGER.info("渲染今日计划页面", extra=LOG_EXTRA) st.header("今日计划") - st.write("待接入候选池筛选与多智能体决策结果。") - sample = make_human_card("000001.SZ", "2025-01-01", {"decisions": []}) - LOGGER.debug("示例卡片内容:%s", sample, extra=LOG_EXTRA) - st.json(sample) + try: + with db_session(read_only=True) as conn: + date_rows = conn.execute( + """ + SELECT DISTINCT trade_date + FROM agent_utils + ORDER BY trade_date DESC + LIMIT 30 + """ + ).fetchall() + except Exception: # noqa: BLE001 + LOGGER.exception("加载 agent_utils 失败", extra=LOG_EXTRA) + st.warning("暂未写入部门/代理决策,请先运行回测或策略评估流程。") + return + + trade_dates = [row["trade_date"] for row in date_rows] + if not trade_dates: + st.info("暂无决策记录,完成一次回测后即可在此查看部门意见与投票结果。") + return + + trade_date = st.selectbox("交易日", trade_dates, index=0) + + with db_session(read_only=True) as conn: + code_rows = conn.execute( + """ + SELECT DISTINCT ts_code + FROM agent_utils + WHERE trade_date = ? + ORDER BY ts_code + """, + (trade_date,), + ).fetchall() + symbols = [row["ts_code"] for row in code_rows] + if not symbols: + st.info("所选交易日暂无 agent_utils 记录。") + return + + ts_code = st.selectbox("标的", symbols, index=0) + + with db_session(read_only=True) as conn: + rows = conn.execute( + """ + SELECT agent, action, utils, feasible, weight + FROM agent_utils + WHERE trade_date = ? AND ts_code = ? + ORDER BY CASE WHEN agent = 'global' THEN 1 ELSE 0 END, agent + """, + (trade_date, ts_code), + ).fetchall() + + if not rows: + st.info("未查询到详细决策记录,稍后再试。") + return + + try: + feasible_actions = json.loads(rows[0]["feasible"] or "[]") + except (KeyError, TypeError, json.JSONDecodeError): + feasible_actions = [] + + global_info = None + dept_records: List[Dict[str, object]] = [] + agent_records: List[Dict[str, object]] = [] + + for item in rows: + agent_name = item["agent"] + action = item["action"] + weight = float(item["weight"] or 0.0) + try: + utils = json.loads(item["utils"] or "{}") + except json.JSONDecodeError: + utils = {} + + if agent_name == "global": + global_info = { + "action": action, + "confidence": float(utils.get("_confidence", 0.0)), + "target_weight": float(utils.get("_target_weight", 0.0)), + "department_votes": utils.get("_department_votes", {}), + "requires_review": bool(utils.get("_requires_review", False)), + } + continue + + if agent_name.startswith("dept_"): + code = agent_name.split("dept_", 1)[-1] + signals = utils.get("_signals", []) + risks = utils.get("_risks", []) + dept_records.append( + { + "部门": code, + "行动": action, + "信心": float(utils.get("_confidence", 0.0)), + "权重": weight, + "摘要": utils.get("_summary", ""), + "核心信号": ";".join(signals) if isinstance(signals, list) else signals, + "风险提示": ";".join(risks) if isinstance(risks, list) else risks, + } + ) + else: + score_map = { + key: float(val) + for key, val in utils.items() + if not str(key).startswith("_") + } + agent_records.append( + { + "代理": agent_name, + "建议动作": action, + "权重": weight, + "SELL": score_map.get("SELL", 0.0), + "HOLD": score_map.get("HOLD", 0.0), + "BUY_S": score_map.get("BUY_S", 0.0), + "BUY_M": score_map.get("BUY_M", 0.0), + "BUY_L": score_map.get("BUY_L", 0.0), + } + ) + + if feasible_actions: + st.caption(f"可行操作集合:{', '.join(feasible_actions)}") + + st.subheader("全局策略") + if global_info: + col1, col2, col3 = st.columns(3) + col1.metric("最终行动", global_info["action"]) + col2.metric("信心", f"{global_info['confidence']:.2f}") + col3.metric("目标权重", f"{global_info['target_weight']:+.2%}") + if global_info["department_votes"]: + st.json(global_info["department_votes"]) + if global_info["requires_review"]: + st.warning("部门分歧较大,已标记为需人工复核。") + else: + st.info("暂未写入全局策略摘要。") + + st.subheader("部门意见") + if dept_records: + dept_df = pd.DataFrame(dept_records) + st.dataframe(dept_df, use_container_width=True, hide_index=True) + else: + st.info("暂无部门记录。") + + st.subheader("代理评分") + if agent_records: + agent_df = pd.DataFrame(agent_records) + st.dataframe(agent_df, use_container_width=True, hide_index=True) + else: + st.info("暂无基础代理评分。") + + st.caption("以上内容来源于 agent_utils 表,可通过回测或实时评估自动更新。") def render_backtest() -> None: @@ -281,7 +427,7 @@ def render_settings() -> None: step=5, ) - strategy_options = ["single", "majority"] + strategy_options = ["single", "majority", "leader"] try: strategy_index = strategy_options.index(llm_cfg.strategy) except ValueError: @@ -393,6 +539,116 @@ def render_settings() -> None: st.success("LLM 设置已保存,仅在当前会话生效。") st.json(llm_config_snapshot()) + st.divider() + st.subheader("部门配置") + + dept_settings = cfg.departments or {} + dept_rows = [ + { + "code": code, + "title": dept.title, + "description": dept.description, + "weight": float(dept.weight), + "strategy": dept.llm.strategy, + "primary_provider": (dept.llm.primary.provider or "ollama"), + "primary_model": dept.llm.primary.model or "", + "ensemble_size": len(dept.llm.ensemble), + } + for code, dept in sorted(dept_settings.items()) + ] + + if not dept_rows: + st.info("当前未配置部门,可在 config.json 中添加。") + dept_rows = [] + + dept_editor = st.data_editor( + dept_rows, + num_rows="fixed", + key="department_editor", + use_container_width=True, + hide_index=True, + column_config={ + "code": st.column_config.TextColumn("编码", disabled=True), + "title": st.column_config.TextColumn("名称"), + "description": st.column_config.TextColumn("说明"), + "weight": st.column_config.NumberColumn("权重", min_value=0.0, max_value=10.0, step=0.1), + "strategy": st.column_config.SelectboxColumn( + "策略", + options=sorted(ALLOWED_LLM_STRATEGIES), + help="single=单模型, majority=多数投票, leader=顾问-决策者模式", + ), + "primary_provider": st.column_config.SelectboxColumn( + "主模型 Provider", + options=sorted(DEFAULT_LLM_MODEL_OPTIONS.keys()), + ), + "primary_model": st.column_config.TextColumn("主模型名称"), + "ensemble_size": st.column_config.NumberColumn( + "协作模型数量", + disabled=True, + help="在 config.json 中编辑 ensemble 详情", + ), + }, + ) + + if hasattr(dept_editor, "to_dict"): + dept_rows = dept_editor.to_dict("records") + else: + dept_rows = dept_editor + + col_reset, col_save = st.columns([1, 1]) + + if col_save.button("保存部门配置"): + updated_departments: Dict[str, DepartmentSettings] = {} + for row in dept_rows: + code = row.get("code") + if not code: + continue + existing = dept_settings.get(code) or DepartmentSettings(code=code, title=code) + existing.title = row.get("title") or existing.title + existing.description = row.get("description") or "" + try: + existing.weight = max(0.0, float(row.get("weight", existing.weight))) + except (TypeError, ValueError): + existing.weight = existing.weight + + strategy_val = (row.get("strategy") or existing.llm.strategy).lower() + if strategy_val in ALLOWED_LLM_STRATEGIES: + existing.llm.strategy = strategy_val + + provider_before = existing.llm.primary.provider or "" + provider_val = (row.get("primary_provider") or provider_before or "ollama").lower() + existing.llm.primary.provider = provider_val + + model_val = (row.get("primary_model") or "").strip() + if model_val: + existing.llm.primary.model = model_val + else: + existing.llm.primary.model = DEFAULT_LLM_MODELS.get(provider_val, existing.llm.primary.model) + + if provider_before != provider_val: + default_base = DEFAULT_LLM_BASE_URLS.get(provider_val) + existing.llm.primary.base_url = default_base or existing.llm.primary.base_url + + existing.llm.primary.__post_init__() + updated_departments[code] = existing + + if updated_departments: + cfg.departments = updated_departments + save_config() + st.success("部门配置已更新。") + else: + st.warning("未能解析部门配置输入。") + + if col_reset.button("恢复默认部门"): + from app.utils.config import _default_departments + + cfg.departments = _default_departments() + save_config() + st.success("已恢复默认部门配置。") + st.experimental_rerun() + + st.caption("部门协作模型(ensemble)请在 config.json 中手动编辑,UI 将在后续版本补充。") + def render_tests() -> None: LOGGER.info("渲染自检页面", extra=LOG_EXTRA) diff --git a/app/utils/config.py b/app/utils/config.py index 989e12e..6aa835e 100644 --- a/app/utils/config.py +++ b/app/utils/config.py @@ -95,6 +95,9 @@ DEFAULT_LLM_TIMEOUTS: Dict[str, float] = { for provider, info in DEFAULT_LLM_MODEL_OPTIONS.items() } +ALLOWED_LLM_STRATEGIES = {"single", "majority", "leader"} +LLM_STRATEGY_ALIASES = {"leader-follower": "leader"} + @dataclass class LLMEndpoint: @@ -125,10 +128,36 @@ class LLMConfig: primary: LLMEndpoint = field(default_factory=LLMEndpoint) ensemble: List[LLMEndpoint] = field(default_factory=list) - strategy: str = "single" # Options: single, majority + strategy: str = "single" # Options: single, majority, leader majority_threshold: int = 3 +@dataclass +class DepartmentSettings: + """Configuration for a single decision department.""" + + code: str + title: str + description: str = "" + weight: float = 1.0 + llm: LLMConfig = field(default_factory=LLMConfig) + + +def _default_departments() -> Dict[str, DepartmentSettings]: + presets = [ + ("momentum", "动量策略部门"), + ("value", "价值评估部门"), + ("news", "新闻情绪部门"), + ("liquidity", "流动性评估部门"), + ("macro", "宏观研究部门"), + ("risk", "风险控制部门"), + ] + return { + code: DepartmentSettings(code=code, title=title) + for code, title in presets + } + + @dataclass class AppConfig: """User configurable settings persisted in a simple structure.""" @@ -140,6 +169,7 @@ class AppConfig: agent_weights: AgentWeights = field(default_factory=AgentWeights) force_refresh: bool = False llm: LLMConfig = field(default_factory=LLMConfig) + departments: Dict[str, DepartmentSettings] = field(default_factory=_default_departments) CONFIG = AppConfig() @@ -197,14 +227,53 @@ def _load_from_file(cfg: AppConfig) -> None: if isinstance(item, dict) ] - strategy = llm_payload.get("strategy") - if strategy in {"single", "majority"}: - cfg.llm.strategy = strategy + strategy_raw = llm_payload.get("strategy") + if isinstance(strategy_raw, str): + normalized = LLM_STRATEGY_ALIASES.get(strategy_raw, strategy_raw) + if normalized in ALLOWED_LLM_STRATEGIES: + cfg.llm.strategy = normalized majority = llm_payload.get("majority_threshold") if isinstance(majority, int) and majority > 0: cfg.llm.majority_threshold = majority + departments_payload = payload.get("departments") + if isinstance(departments_payload, dict): + new_departments: Dict[str, DepartmentSettings] = {} + for code, data in departments_payload.items(): + if not isinstance(data, dict): + continue + title = data.get("title") or code + description = data.get("description") or "" + weight = float(data.get("weight", 1.0)) + llm_data = data.get("llm") + llm_cfg = LLMConfig() + if isinstance(llm_data, dict): + if isinstance(llm_data.get("primary"), dict): + llm_cfg.primary = _dict_to_endpoint(llm_data["primary"]) + llm_cfg.ensemble = [ + _dict_to_endpoint(item) + for item in llm_data.get("ensemble", []) + if isinstance(item, dict) + ] + strategy_raw = llm_data.get("strategy") + if isinstance(strategy_raw, str): + normalized = LLM_STRATEGY_ALIASES.get(strategy_raw, strategy_raw) + if normalized in ALLOWED_LLM_STRATEGIES: + llm_cfg.strategy = normalized + majority_raw = llm_data.get("majority_threshold") + if isinstance(majority_raw, int) and majority_raw > 0: + llm_cfg.majority_threshold = majority_raw + new_departments[code] = DepartmentSettings( + code=code, + title=title, + description=description, + weight=weight, + llm=llm_cfg, + ) + if new_departments: + cfg.departments = new_departments + def save_config(cfg: AppConfig | None = None) -> None: cfg = cfg or CONFIG @@ -214,11 +283,25 @@ def save_config(cfg: AppConfig | None = None) -> None: "force_refresh": cfg.force_refresh, "decision_method": cfg.decision_method, "llm": { - "strategy": cfg.llm.strategy, + "strategy": cfg.llm.strategy if cfg.llm.strategy in ALLOWED_LLM_STRATEGIES else "single", "majority_threshold": cfg.llm.majority_threshold, "primary": _endpoint_to_dict(cfg.llm.primary), "ensemble": [_endpoint_to_dict(ep) for ep in cfg.llm.ensemble], }, + "departments": { + code: { + "title": dept.title, + "description": dept.description, + "weight": dept.weight, + "llm": { + "strategy": dept.llm.strategy if dept.llm.strategy in ALLOWED_LLM_STRATEGIES else "single", + "majority_threshold": dept.llm.majority_threshold, + "primary": _endpoint_to_dict(dept.llm.primary), + "ensemble": [_endpoint_to_dict(ep) for ep in dept.llm.ensemble], + }, + } + for code, dept in cfg.departments.items() + }, } try: path.parent.mkdir(parents=True, exist_ok=True) diff --git a/requirements.txt b/requirements.txt index 9e94bbe..24f881a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ plotly>=5.18 streamlit>=1.30 tushare>=1.2 requests>=2.31 +python-box>=7.0