diff --git a/app/backtest/optimizer.py b/app/backtest/optimizer.py index 3fa3c14..0fe26cf 100644 --- a/app/backtest/optimizer.py +++ b/app/backtest/optimizer.py @@ -1,10 +1,13 @@ """Optimization utilities for DecisionEnv-based parameter tuning.""" from __future__ import annotations +import math import random from dataclasses import dataclass, field from typing import Any, Dict, Iterable, List, Mapping, Sequence, Tuple +import numpy as np + from app.backtest.decision_env import DecisionEnv, EpisodeMetrics from app.backtest.decision_env import ParameterSpec from app.utils.logging import get_logger @@ -16,13 +19,18 @@ LOG_EXTRA = {"stage": "decision_bandit"} @dataclass class BanditConfig: - """Configuration for epsilon-greedy bandit optimization.""" + """Configuration shared by all global parameter search strategies.""" experiment_id: str strategy: str = "epsilon_greedy" episodes: int = 20 epsilon: float = 0.2 seed: int | None = None + exploration_weight: float = 0.01 + candidate_pool: int = 128 + initial_candidates: int = 27 + eta: int = 3 + max_rounds: int = 3 @dataclass @@ -53,84 +61,69 @@ class BanditSummary: return sum(item.reward for item in self.episodes) / len(self.episodes) -class EpsilonGreedyBandit: - """Simple epsilon-greedy tuner using DecisionEnv as the reward oracle.""" +class _BaseOptimizer: + """Shared helpers for global parameter search algorithms.""" def __init__(self, env: DecisionEnv, config: BanditConfig) -> None: self.env = env self.config = config - self._random = random.Random(config.seed) self._specs: List[ParameterSpec] = list(getattr(env, "_specs", [])) if not self._specs: raise ValueError("DecisionEnv does not expose parameter specs") - self._value_estimates: Dict[Tuple[float, ...], float] = {} - self._counts: Dict[Tuple[float, ...], int] = {} self._history = BanditSummary() + self._random = random.Random(config.seed) - def run(self) -> BanditSummary: - for episode in range(1, self.config.episodes + 1): - action = self._select_action() - self.env.reset() - done = False - cumulative_reward = 0.0 - obs = {} - info: Dict[str, Any] = {} - while not done: - obs, reward, done, info = self.env.step(action) - cumulative_reward += reward + def _evaluate_action(self, action: Sequence[float]) -> Tuple[float, EpisodeMetrics, Dict[str, float], Dict[str, Any]]: + self.env.reset() + cumulative_reward = 0.0 + obs: Dict[str, float] = {} + info: Dict[str, Any] = {} + done = False + while not done: + obs, reward, done, info = self.env.step(action) + cumulative_reward += reward + metrics = self.env.last_metrics + if metrics is None: + raise RuntimeError("DecisionEnv did not populate last_metrics") + return cumulative_reward, metrics, obs, info - metrics = self.env.last_metrics - if metrics is None: - raise RuntimeError("DecisionEnv did not populate last_metrics") - key = tuple(action) - old_estimate = self._value_estimates.get(key, 0.0) - count = self._counts.get(key, 0) + 1 - self._counts[key] = count - self._value_estimates[key] = old_estimate + (cumulative_reward - old_estimate) / count - - action_payload = self._raw_action_mapping(action) - resolved_action = self._resolved_action_mapping(action) - metrics_payload = _metrics_to_dict(metrics) - department_controls = info.get("department_controls") - if department_controls: - metrics_payload["department_controls"] = department_controls - metrics_payload["resolved_action"] = resolved_action - try: - log_tuning_result( - experiment_id=self.config.experiment_id, - strategy=self.config.strategy, - action=action_payload, - reward=cumulative_reward, - metrics=metrics_payload, - weights=info.get("weights"), - ) - except Exception: # noqa: BLE001 - LOGGER.exception("failed to log tuning result", extra=LOG_EXTRA) - - episode_record = BanditEpisode( + def _record_episode( + self, + action: Sequence[float], + reward: float, + metrics: EpisodeMetrics, + obs: Dict[str, float], + info: Dict[str, Any], + ) -> None: + action_payload = self._raw_action_mapping(action) + resolved_action = self._resolved_action_mapping(action) + metrics_payload = _metrics_to_dict(metrics) + department_controls = info.get("department_controls") + if department_controls: + metrics_payload["department_controls"] = department_controls + metrics_payload["resolved_action"] = resolved_action + try: + log_tuning_result( + experiment_id=self.config.experiment_id, + strategy=self.config.strategy, action=action_payload, - resolved_action=resolved_action, - reward=cumulative_reward, - metrics=metrics, - observation=obs, + reward=reward, + metrics=metrics_payload, weights=info.get("weights"), - department_controls=department_controls, ) - self._history.episodes.append(episode_record) - LOGGER.info( - "Bandit episode=%s reward=%.4f action=%s", - episode, - cumulative_reward, - action_payload, - extra=LOG_EXTRA, - ) - return self._history + except Exception: # noqa: BLE001 + LOGGER.exception("failed to log tuning result", extra=LOG_EXTRA) - def _select_action(self) -> List[float]: - if self._value_estimates and self._random.random() > self.config.epsilon: - best = max(self._value_estimates.items(), key=lambda item: item[1])[0] - return list(best) - return [self._sample_value(spec) for spec in self._specs] + episode_record = BanditEpisode( + action=action_payload, + resolved_action=resolved_action, + reward=reward, + metrics=metrics, + observation=obs, + weights=info.get("weights"), + department_controls=department_controls, + ) + self._history.episodes.append(episode_record) def _raw_action_mapping(self, action: Sequence[float]) -> Dict[str, float]: return { @@ -144,13 +137,142 @@ class EpsilonGreedyBandit: for spec, value in zip(self._specs, action, strict=True) } - def _sample_value(self, spec: ParameterSpec) -> float: - if spec.values: - if len(spec.values) <= 1: - return 0.0 - index = self._random.randrange(len(spec.values)) - return index / (len(spec.values) - 1) - return self._random.random() + def _sample_random_action(self) -> List[float]: + values: List[float] = [] + for spec in self._specs: + if spec.values: + if len(spec.values) <= 1: + values.append(0.0) + else: + index = self._random.randrange(len(spec.values)) + values.append(index / (len(spec.values) - 1)) + else: + values.append(self._random.random()) + return values + + def _mutate_action(self, action: Sequence[float], scale: float = 0.1) -> List[float]: + mutated = [] + for value in action: + jitter = self._random.gauss(0.0, scale) + mutated.append(min(1.0, max(0.0, float(value + jitter)))) + return mutated + + +class EpsilonGreedyBandit(_BaseOptimizer): + """Epsilon-greedy tuner using DecisionEnv as the reward oracle.""" + + def __init__(self, env: DecisionEnv, config: BanditConfig) -> None: + super().__init__(env, config) + self._value_estimates: Dict[Tuple[float, ...], float] = {} + self._counts: Dict[Tuple[float, ...], int] = {} + + def run(self) -> BanditSummary: + for episode in range(1, self.config.episodes + 1): + action = self._select_action() + reward, metrics, obs, info = self._evaluate_action(action) + key = tuple(action) + old_estimate = self._value_estimates.get(key, 0.0) + count = self._counts.get(key, 0) + 1 + self._counts[key] = count + self._value_estimates[key] = old_estimate + (reward - old_estimate) / count + + self._record_episode(action, reward, metrics, obs, info) + LOGGER.info( + "Bandit episode=%s reward=%.4f action=%s", + episode, + reward, + self._raw_action_mapping(action), + extra=LOG_EXTRA, + ) + return self._history + + def _select_action(self) -> List[float]: + if self._value_estimates and self._random.random() > self.config.epsilon: + best = max(self._value_estimates.items(), key=lambda item: item[1])[0] + return list(best) + return self._sample_random_action() + + +class BayesianBandit(_BaseOptimizer): + """Gaussian-process based Bayesian optimization.""" + + def __init__(self, env: DecisionEnv, config: BanditConfig) -> None: + super().__init__(env, config) + self._X: List[np.ndarray] = [] + self._y: List[float] = [] + self._noise = 1e-6 + self._length_scale = 0.3 + + def run(self) -> BanditSummary: + for _ in range(self.config.episodes): + action = self._propose_action() + reward, metrics, obs, info = self._evaluate_action(action) + self._record_episode(action, reward, metrics, obs, info) + self._X.append(np.array(action, dtype=float)) + self._y.append(reward) + return self._history + + def _propose_action(self) -> List[float]: + if not self._X: + return self._sample_random_action() + + X = np.vstack(self._X) + y = np.asarray(self._y, dtype=float) + K = self._kernel(X, X) + self._noise * np.eye(len(X)) + try: + K_inv = np.linalg.inv(K) + except np.linalg.LinAlgError: + K_inv = np.linalg.pinv(K) + + best_y = max(y) + candidates = [self._sample_random_action() for _ in range(self.config.candidate_pool)] + ei_values: List[Tuple[float, List[float]]] = [] + for candidate in candidates: + x = np.asarray(candidate, dtype=float) + k_star = self._kernel(X, x[None, :])[:, 0] + mean = float(k_star @ K_inv @ y) + k_ss = float(self._kernel(x[None, :], x[None, :])[0, 0]) + variance = max(k_ss - k_star @ K_inv @ k_star, 1e-9) + std = math.sqrt(variance) + improvement = mean - best_y - self.config.exploration_weight + z = improvement / std if std > 0 else 0.0 + cdf = 0.5 * (1.0 + math.erf(z / math.sqrt(2.0))) + pdf = (1.0 / math.sqrt(2.0 * math.pi)) * math.exp(-0.5 * z * z) + ei = improvement * cdf + std * pdf if std > 0 else max(improvement, 0.0) + ei_values.append((ei, candidate)) + + ei_values.sort(key=lambda item: item[0], reverse=True) + best = ei_values[0][1] if ei_values else self._sample_random_action() + return best + + def _kernel(self, x1: np.ndarray, x2: np.ndarray) -> np.ndarray: + sq_dist = np.sum((x1[:, None, :] - x2[None, :, :]) ** 2, axis=2) + return np.exp(-0.5 * sq_dist / (self._length_scale ** 2)) + + +class SuccessiveHalvingOptimizer(_BaseOptimizer): + """Simplified BOHB-style successive halving optimizer.""" + + def run(self) -> BanditSummary: + num_candidates = max(1, self.config.initial_candidates) + eta = max(2, self.config.eta) + actions = [self._sample_random_action() for _ in range(num_candidates)] + + for round_idx in range(self.config.max_rounds): + if not actions: + break + evaluations: List[Tuple[float, List[float]]] = [] + for action in actions: + reward, metrics, obs, info = self._evaluate_action(action) + self._record_episode(action, reward, metrics, obs, info) + evaluations.append((reward, action)) + evaluations.sort(key=lambda item: item[0], reverse=True) + survivors = max(1, len(evaluations) // eta) + actions = [action for _, action in evaluations[:survivors]] + if len(actions) == 1: + break + actions = [self._mutate_action(action, scale=0.05 * (round_idx + 1)) for action in actions] + return self._history def _metrics_to_dict(metrics: EpisodeMetrics) -> Dict[str, float | Dict[str, int]]: @@ -159,6 +281,7 @@ def _metrics_to_dict(metrics: EpisodeMetrics) -> Dict[str, float | Dict[str, int "max_drawdown": metrics.max_drawdown, "volatility": metrics.volatility, "sharpe_like": metrics.sharpe_like, + "calmar_like": metrics.calmar_like, "turnover": metrics.turnover, "turnover_value": metrics.turnover_value, "trade_count": float(metrics.trade_count), diff --git a/app/ui/views/backtest.py b/app/ui/views/backtest.py index 794dbd2..a5db9fc 100644 --- a/app/ui/views/backtest.py +++ b/app/ui/views/backtest.py @@ -638,6 +638,17 @@ def render_backtest_review() -> None: help="可选:为本次调参记录一个策略名称或备注。", ) + strategy_choice = st.selectbox( + "搜索策略", + ["epsilon_greedy", "bayesian", "bohb"], + format_func=lambda x: { + "epsilon_greedy": "Epsilon-Greedy", + "bayesian": "贝叶斯优化", + "bohb": "BOHB/Successive Halving", + }.get(x, x), + key="decision_env_search_strategy", + ) + agent_objects = default_agents() agent_names = [agent.name for agent in agent_objects] if not agent_names: @@ -841,34 +852,11 @@ def render_backtest_review() -> None: ) st.divider() - st.subheader("自动探索(epsilon-greedy)") - col_ep, col_eps, col_seed = st.columns([1, 1, 1]) - bandit_episodes = int( - col_ep.number_input( - "迭代次数", - min_value=1, - max_value=200, - value=10, - step=1, - key="decision_env_bandit_episodes", - help="探索的回合数,越大越充分但耗时越久。", - ) - ) - bandit_epsilon = float( - col_eps.slider( - "探索比例 ε", - min_value=0.0, - max_value=1.0, - value=0.2, - step=0.05, - key="decision_env_bandit_epsilon", - help="ε 越大,随机探索概率越高。", - ) - ) - seed_text = col_seed.text_input( + st.subheader("全局参数搜索") + seed_text = st.text_input( "随机种子(可选)", value="", - key="decision_env_bandit_seed", + key="decision_env_search_seed", help="填写整数可复现实验,不填写则随机。", ).strip() bandit_seed = None @@ -879,7 +867,113 @@ def render_backtest_review() -> None: st.warning("随机种子需为整数,已忽略该值。") bandit_seed = None - run_bandit = st.button("执行自动探索", key="run_decision_env_bandit") + if strategy_choice == "epsilon_greedy": + col_ep, col_eps = st.columns([1, 1]) + bandit_episodes = int( + col_ep.number_input( + "迭代次数", + min_value=1, + max_value=200, + value=10, + step=1, + key="decision_env_bandit_episodes", + help="探索的回合数,越大越充分但耗时越久。", + ) + ) + bandit_epsilon = float( + col_eps.slider( + "探索比例 ε", + min_value=0.0, + max_value=1.0, + value=0.2, + step=0.05, + key="decision_env_bandit_epsilon", + help="ε 越大,随机探索概率越高。", + ) + ) + bayes_iterations = bandit_episodes + bayes_pool = 128 + bayes_explore = 0.01 + bohb_initial = 27 + bohb_eta = 3 + bohb_rounds = 3 + elif strategy_choice == "bayesian": + col_ep, col_pool, col_xi = st.columns(3) + bayes_iterations = int( + col_ep.number_input( + "迭代次数", + min_value=3, + max_value=200, + value=15, + step=1, + key="decision_env_bayes_iterations", + ) + ) + bayes_pool = int( + col_pool.number_input( + "候选采样数", + min_value=16, + max_value=1024, + value=128, + step=16, + key="decision_env_bayes_pool", + ) + ) + bayes_explore = float( + col_xi.number_input( + "探索权重 ξ", + min_value=0.0, + max_value=0.5, + value=0.01, + step=0.01, + format="%.3f", + key="decision_env_bayes_xi", + ) + ) + bandit_episodes = bayes_iterations + bandit_epsilon = 0.0 + bohb_initial = 27 + bohb_eta = 3 + bohb_rounds = 3 + else: # bohb + col_init, col_eta, col_rounds = st.columns(3) + bohb_initial = int( + col_init.number_input( + "初始候选数", + min_value=3, + max_value=243, + value=27, + step=3, + key="decision_env_bohb_initial", + ) + ) + bohb_eta = int( + col_eta.number_input( + "压缩因子 η", + min_value=2, + max_value=6, + value=3, + step=1, + key="decision_env_bohb_eta", + ) + ) + bohb_rounds = int( + col_rounds.number_input( + "最大轮次", + min_value=1, + max_value=6, + value=3, + step=1, + key="decision_env_bohb_rounds", + ) + ) + bandit_episodes = bohb_initial + bandit_epsilon = 0.0 + bayes_iterations = bandit_episodes + bayes_pool = 128 + bayes_explore = 0.01 + + run_bandit = st.button("执行参数搜索", key="run_decision_env_bandit") if run_bandit: if not specs: st.warning("请至少配置一个动作维度再执行探索。") @@ -912,14 +1006,25 @@ def render_backtest_review() -> None: baseline_weights=baseline_weights, disable_departments=disable_departments, ) + search_strategy = strategy_choice config = BanditConfig( experiment_id=experiment_id or f"bandit_{datetime.now().strftime('%Y%m%d_%H%M%S')}", - strategy=strategy_label or "DecisionEnv", + strategy=strategy_label or search_strategy, episodes=bandit_episodes, epsilon=bandit_epsilon, seed=bandit_seed, + exploration_weight=bayes_explore, + candidate_pool=bayes_pool, + initial_candidates=bohb_initial, + eta=bohb_eta, + max_rounds=bohb_rounds, ) - bandit = EpsilonGreedyBandit(env, config) + if search_strategy == "bayesian": + bandit = BayesianBandit(env, config) + elif search_strategy == "bohb": + bandit = SuccessiveHalvingOptimizer(env, config) + else: + bandit = EpsilonGreedyBandit(env, config) with st.spinner("自动探索进行中,请稍候..."): summary = bandit.run() diff --git a/docs/TODO.md b/docs/TODO.md index ff6a940..5dfad57 100644 --- a/docs/TODO.md +++ b/docs/TODO.md @@ -22,7 +22,7 @@ | 强化学习基线 | ✅ | PPO/SAC 等连续动作算法已接入并形成实验基线。 | | 奖励与评估体系 | 🔄 | 决策环境奖励已纳入风险/Turnover/Sharpe-Calmar,待接入成交与资金曲线指标。 | | 实时持仓链路 | ⏳ | 建立线上持仓/成交写入与离线调参与监控共享的数据源。 | -| 全局参数搜索 | ⏳ | 引入 Bandit、贝叶斯优化或 BOHB 提供权重/参数候选。 | +| 全局参数搜索 | 🔄 | 已上线 epsilon-greedy 调参与指标输出,后续补充贝叶斯优化 / BOHB。 | ## 多智能体协同与 LLM diff --git a/tests/test_bandit_optimizer.py b/tests/test_bandit_optimizer.py index 147016e..8040f28 100644 --- a/tests/test_bandit_optimizer.py +++ b/tests/test_bandit_optimizer.py @@ -1,10 +1,15 @@ -"""Tests for epsilon-greedy bandit optimizer.""" +"""Tests for global parameter search optimizers.""" from __future__ import annotations import pytest from app.backtest.decision_env import EpisodeMetrics, ParameterSpec -from app.backtest.optimizer import BanditConfig, EpsilonGreedyBandit +from app.backtest.optimizer import ( + BanditConfig, + EpsilonGreedyBandit, + BayesianBandit, + SuccessiveHalvingOptimizer, +) from app.utils import tuning @@ -84,11 +89,11 @@ def patch_logging(monkeypatch): return records -def test_bandit_optimizer_runs_and_logs(patch_logging): +def test_epsilon_greedy_optimizer(patch_logging): env = DummyEnv() optimizer = EpsilonGreedyBandit( env, - BanditConfig(experiment_id="exp", episodes=5, epsilon=0.5, seed=42), + BanditConfig(experiment_id="exp_eps", episodes=5, epsilon=0.5, seed=42), ) summary = optimizer.run() @@ -98,8 +103,42 @@ def test_bandit_optimizer_runs_and_logs(patch_logging): payload = patch_logging[0]["metrics"] assert isinstance(payload, dict) assert "risk_breakdown" in payload - assert "department_controls" in payload + assert summary.best_episode.department_controls == {"momentum": {"prompt": "baseline"}} - first_episode = summary.episodes[0] - assert first_episode.resolved_action - assert first_episode.department_controls == {"momentum": {"prompt": "baseline"}} + +def test_bayesian_optimizer(patch_logging): + env = DummyEnv() + optimizer = BayesianBandit( + env, + BanditConfig( + experiment_id="exp_bayes", + strategy="bayesian", + episodes=6, + candidate_pool=32, + exploration_weight=0.01, + seed=123, + ), + ) + summary = optimizer.run() + assert summary.best_episode is not None + assert summary.best_episode.reward > 0.3 + assert len(patch_logging) >= 6 + + +def test_successive_halving_optimizer(patch_logging): + env = DummyEnv() + optimizer = SuccessiveHalvingOptimizer( + env, + BanditConfig( + experiment_id="exp_bohb", + strategy="bohb", + initial_candidates=9, + eta=3, + max_rounds=2, + seed=7, + ), + ) + summary = optimizer.run() + assert summary.best_episode is not None + assert summary.best_episode.reward > 0.3 + assert len(patch_logging) >= 9