llm-quant/app/backtest/engine.py
2025-09-28 09:39:48 +08:00

192 lines
6.6 KiB
Python

"""Backtest engine skeleton for daily bar simulation."""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from datetime import date
from typing import Dict, Iterable, List, Mapping
from app.agents.base import AgentContext
from app.agents.departments import DepartmentManager
from app.agents.game import Decision, decide
from app.agents.registry import default_agents
from app.utils.config import get_config
from app.utils.db import db_session
from app.utils.logging import get_logger
LOGGER = get_logger(__name__)
LOG_EXTRA = {"stage": "backtest"}
@dataclass
class BtConfig:
id: str
name: str
start_date: date
end_date: date
universe: List[str]
params: Dict[str, float]
method: str = "nash"
@dataclass
class PortfolioState:
cash: float = 1_000_000.0
holdings: Dict[str, float] = field(default_factory=dict)
@dataclass
class BacktestResult:
nav_series: List[Dict[str, float]] = field(default_factory=list)
trades: List[Dict[str, str]] = field(default_factory=list)
class BacktestEngine:
"""Runs the multi-agent game inside a daily event-driven loop."""
def __init__(self, cfg: BtConfig) -> None:
self.cfg = cfg
self.agents = default_agents()
app_cfg = get_config()
weight_config = app_cfg.agent_weights.as_dict() if app_cfg.agent_weights else {}
if weight_config:
self.weights = weight_config
else:
self.weights = {agent.name: 1.0 for agent in self.agents}
self.department_manager = (
DepartmentManager(app_cfg.departments) if app_cfg.departments else None
)
def load_market_data(self, trade_date: date) -> Mapping[str, Dict[str, float]]:
"""Load per-stock feature vectors. Replace with real data access."""
_ = trade_date
return {}
def simulate_day(self, trade_date: date, state: PortfolioState) -> List[Decision]:
feature_map = self.load_market_data(trade_date)
decisions: List[Decision] = []
for ts_code, features in feature_map.items():
context = AgentContext(ts_code=ts_code, trade_date=trade_date.isoformat(), features=features)
decision = decide(
context,
self.agents,
self.weights,
method=self.cfg.method,
department_manager=self.department_manager,
)
decisions.append(decision)
self.record_agent_state(context, decision)
# TODO: translate decisions into fills, holdings, and NAV updates.
_ = state
return decisions
def record_agent_state(self, context: AgentContext, decision: Decision) -> None:
payload = {
"trade_date": context.trade_date,
"ts_code": context.ts_code,
"action": decision.action.value,
"confidence": decision.confidence,
"department_votes": decision.department_votes,
"requires_review": decision.requires_review,
"departments": {
code: dept.to_dict()
for code, dept in decision.department_decisions.items()
},
}
combined_weights = dict(self.weights)
if self.department_manager:
for code, agent in self.department_manager.agents.items():
key = f"dept_{code}"
combined_weights[key] = agent.settings.weight
feasible_json = json.dumps(
[action.value for action in decision.feasible_actions],
ensure_ascii=False,
)
rows = []
for agent_name, weight in combined_weights.items():
action_scores = {
action.value: float(decision.utilities.get(action, {}).get(agent_name, 0.0))
for action in decision.utilities.keys()
}
best_action = decision.action.value
if action_scores:
best_action = max(action_scores.items(), key=lambda item: item[1])[0]
metadata: Dict[str, object] = {}
if agent_name.startswith("dept_"):
dept_code = agent_name.split("dept_", 1)[-1]
dept_decision = decision.department_decisions.get(dept_code)
if dept_decision:
metadata = {
"_summary": dept_decision.summary,
"_signals": dept_decision.signals,
"_risks": dept_decision.risks,
"_confidence": dept_decision.confidence,
}
payload_json = {**action_scores, **metadata}
rows.append(
(
context.trade_date,
context.ts_code,
agent_name,
best_action,
json.dumps(payload_json, ensure_ascii=False),
feasible_json,
float(weight),
)
)
global_payload = {
"_confidence": decision.confidence,
"_target_weight": decision.target_weight,
"_department_votes": decision.department_votes,
"_requires_review": decision.requires_review,
}
rows.append(
(
context.trade_date,
context.ts_code,
"global",
decision.action.value,
json.dumps(global_payload, ensure_ascii=False),
feasible_json,
1.0,
)
)
try:
with db_session() as conn:
conn.executemany(
"""
INSERT OR REPLACE INTO agent_utils
(trade_date, ts_code, agent, action, utils, feasible, weight)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
rows,
)
except Exception:
LOGGER.exception("写入 agent_utils 失败", extra=LOG_EXTRA)
_ = payload
# TODO: persist payload into bt_trades / audit tables when schema is ready.
def run(self) -> BacktestResult:
state = PortfolioState()
result = BacktestResult()
current = self.cfg.start_date
while current <= self.cfg.end_date:
decisions = self.simulate_day(current, state)
_ = decisions
current = date.fromordinal(current.toordinal() + 1)
return result
def run_backtest(cfg: BtConfig) -> BacktestResult:
engine = BacktestEngine(cfg)
result = engine.run()
with db_session() as conn:
_ = conn
# Implementation should persist bt_nav, bt_trades, and bt_report rows.
return result