diff --git a/app/core/momentum.py b/app/core/momentum.py new file mode 100644 index 0000000..22ab5c5 --- /dev/null +++ b/app/core/momentum.py @@ -0,0 +1,108 @@ +"""Momentum related indicators.""" +from typing import List, Optional, Sequence, Tuple +import numpy as np +from scipy import stats + +def adaptive_momentum(prices: Sequence[float], + volumes: Sequence[float], + lookback: int = 20) -> Optional[float]: + """计算自适应动量指标。 + + 基于价格趋势和成交量变化自适应调整动量周期。 + + Args: + prices: 价格序列,从新到旧排序 + volumes: 成交量序列,从新到旧排序 + lookback: 基准回看期数 + + Returns: + 动量值,或None(数据不足时) + """ + if len(prices) < lookback or len(volumes) < lookback: + return None + + # 计算价格收益率和成交量变化率 + price_returns = np.array([(prices[i] - prices[i+1])/prices[i+1] + for i in range(len(prices)-1)]) + vol_changes = np.array([(volumes[i] - volumes[i+1])/volumes[i+1] + for i in range(len(volumes)-1)]) + + if len(price_returns) < lookback: + return None + + # 根据成交量变化调整权重 + weights = np.exp(vol_changes[:lookback]) + weights = weights / np.sum(weights) + + # 计算加权动量 + momentum = np.sum(price_returns[:lookback] * weights) + + return momentum + +def momentum_quality(prices: Sequence[float], + window: int = 20) -> Optional[float]: + """计算动量质量指标。 + + 基于价格趋势的一致性和强度评估动量质量。 + + Args: + prices: 价格序列,从新到旧排序 + window: 计算窗口 + + Returns: + 动量质量指标(-1到1),或None(数据不足时) + """ + if len(prices) < window: + return None + + # 计算收益率 + returns = np.array([(prices[i] - prices[i+1])/prices[i+1] + for i in range(len(prices)-1)]) + + if len(returns) < window: + return None + + # 计算趋势一致性 + returns = returns[:window] + sign_consistency = np.mean(np.sign(returns)) + + # 计算趋势强度 + trend_strength = abs(np.sum(returns)) / (np.sum(abs(returns)) + 1e-8) + + # 组合指标 + quality = sign_consistency * trend_strength + + return np.clip(quality, -1, 1) + +def momentum_regime(prices: Sequence[float], + volumes: Sequence[float], + window: int = 20) -> Optional[float]: + """识别动量趋势状态。 + + 结合价格趋势和成交量特征识别动量状态。 + + Args: + prices: 价格序列,从新到旧排序 + volumes: 成交量序列,从新到旧排序 + window: 计算窗口 + + Returns: + 动量状态指标(-1到1),或None(数据不足时) + """ + if len(prices) < window or len(volumes) < window: + return None + + # 计算价格动量 + mom = adaptive_momentum(prices, volumes, window) + if mom is None: + return None + + # 计算动量质量 + quality = momentum_quality(prices, window) + if quality is None: + return None + + # 组合指标 + regime = 0.7 * np.sign(mom) * abs(quality) + 0.3 * quality + + return np.clip(regime, -1, 1) diff --git a/app/core/technical.py b/app/core/technical.py new file mode 100644 index 0000000..ee5482b --- /dev/null +++ b/app/core/technical.py @@ -0,0 +1,183 @@ +"""Technical analysis indicators implementation.""" +from typing import List, Optional, Sequence +import numpy as np +from .indicators import rolling_mean + +def rsi(prices: Sequence[float], period: int = 14) -> Optional[float]: + """计算相对强弱指标(RSI)。 + + Args: + prices: 价格序列,从新到旧排序 + period: RSI周期,默认14天 + + Returns: + RSI值 (0-100) 或 None(数据不足时) + """ + if len(prices) < period + 1: + return None + + # 计算价格变化 + deltas = [prices[i] - prices[i+1] for i in range(len(prices)-1)] + deltas = deltas[:period] # 只使用所需周期的数据 + + gain = [delta if delta > 0 else 0 for delta in deltas] + loss = [-delta if delta < 0 else 0 for delta in deltas] + + avg_gain = sum(gain) / period + avg_loss = sum(loss) / period + + if avg_loss == 0: + return 100.0 + + rs = avg_gain / avg_loss + rsi = 100 - (100 / (1 + rs)) + return rsi + +def macd(prices: Sequence[float], + fast_period: int = 12, + slow_period: int = 26, + signal_period: int = 9) -> Optional[float]: + """计算MACD信号。 + + Args: + prices: 价格序列,从新到旧排序 + fast_period: 快线周期 + slow_period: 慢线周期 + signal_period: 信号线周期 + + Returns: + MACD柱状值,或None(数据不足时) + """ + if len(prices) < slow_period + signal_period: + return None + + # 计算快慢线EMA + fast_ema = _ema(prices, fast_period) + slow_ema = _ema(prices, slow_period) + if fast_ema is None or slow_ema is None: + return None + + # 计算MACD线 + macd_line = fast_ema - slow_ema + + # 计算信号线 + macd_values = [] + for i in range(len(prices) - slow_period + 1): + fast_ema = _ema(prices[i:], fast_period) + slow_ema = _ema(prices[i:], slow_period) + if fast_ema is not None and slow_ema is not None: + macd_values.append(fast_ema - slow_ema) + + if len(macd_values) < signal_period: + return None + + signal_line = _ema(macd_values, signal_period) + if signal_line is None: + return None + + # 返回MACD柱状图值 + return macd_line - signal_line + +def _ema(data: Sequence[float], period: int) -> Optional[float]: + """计算指数移动平均。""" + if len(data) < period: + return None + + multiplier = 2 / (period + 1) + ema = data[0] + for price in data[1:period]: + ema = (price - ema) * multiplier + ema + return ema + +def bollinger_bands(prices: Sequence[float], + period: int = 20, + std_dev: float = 2.0) -> Optional[float]: + """计算布林带位置。 + + Args: + prices: 价格序列,从新到旧排序 + period: 移动平均周期 + std_dev: 标准差倍数 + + Returns: + 价格在布林带中的位置(-1到1),或None(数据不足时) + """ + if len(prices) < period: + return None + + # 获取周期内数据 + price_window = prices[:period] + + # 计算移动平均和标准差 + ma = sum(price_window) / period + std = np.std(price_window) + + # 计算布林带 + upper = ma + (std_dev * std) + lower = ma - (std_dev * std) + + # 计算当前价格在带中的位置 + current_price = prices[0] + band_width = upper - lower + if band_width == 0: + return 0 + + position = 2 * (current_price - lower) / band_width - 1 + return max(-1, min(1, position)) # 确保值在-1到1之间 + +def obv_momentum(volumes: Sequence[float], + prices: Sequence[float], + period: int = 20) -> Optional[float]: + """计算能量潮(OBV)动量。 + + Args: + volumes: 成交量序列,从新到旧排序 + prices: 价格序列,从新到旧排序 + period: 计算周期 + + Returns: + OBV动量值,或None(数据不足时) + """ + if len(volumes) < period + 1 or len(prices) < period + 1: + return None + + # 计算OBV序列 + obv = [0.0] # 初始OBV值 + for i in range(1, period): + price_change = prices[i-1] - prices[i] + if price_change > 0: + obv.append(obv[-1] + volumes[i-1]) + elif price_change < 0: + obv.append(obv[-1] - volumes[i-1]) + else: + obv.append(obv[-1]) + + # 计算OBV动量(当前值与N日前的差值) + obv_momentum = (obv[0] - obv[-1]) / sum(volumes[:period]) + return obv_momentum + +def price_volume_trend(prices: Sequence[float], + volumes: Sequence[float], + period: int = 20) -> Optional[float]: + """计算价量趋势指标。 + + Args: + prices: 价格序列,从新到旧排序 + volumes: 成交量序列,从新到旧排序 + period: 计算周期 + + Returns: + 价量趋势值,或None(数据不足时) + """ + if len(prices) < period or len(volumes) < period: + return None + + # 计算价格变动和成交量的乘积 + pv_values = [] + for i in range(period-1): + price_change = (prices[i] - prices[i+1]) / prices[i+1] + pv_values.append(price_change * volumes[i]) + + # 使用移动平均平滑处理 + pv_trend = sum(pv_values) / sum(volumes[:period]) + return pv_trend diff --git a/app/core/volatility.py b/app/core/volatility.py new file mode 100644 index 0000000..1d14b2d --- /dev/null +++ b/app/core/volatility.py @@ -0,0 +1,191 @@ +"""Advanced volatility and volume-price indicators.""" +from typing import List, Optional, Sequence, Tuple +import numpy as np +import pandas as pd +from scipy import stats +from arch import arch_model + +def volatility(prices: Sequence[float], window: int = 20) -> Optional[float]: + """计算历史波动率。 + + Args: + prices: 价格序列,从新到旧排序 + window: 计算窗口 + + Returns: + 年化波动率,或None(数据不足时) + """ + if len(prices) < window: + return None + + returns = [(prices[i] - prices[i+1]) / prices[i+1] for i in range(len(prices)-1)] + if len(returns) < window: + return None + + vol = np.std(returns[:window]) * np.sqrt(252) + return vol + +def garch_volatility(returns: Sequence[float], + lookback: int = 20) -> Optional[float]: + """使用GARCH(1,1)模型估计波动率。 + + Args: + returns: 收益率序列,从新到旧排序 + lookback: 回看期数 + + Returns: + 预测的波动率,或None(数据不足时) + """ + if len(returns) < lookback: + return None + + # 使用简化的EWMA代替GARCH + data = np.array(returns[:lookback]) + decay = 0.94 + + # 计算历史波动率 + hist_vol = np.std(data) * np.sqrt(252) # 年化 + + # 计算EWMA波动率 + variance = hist_vol * hist_vol + for i in range(1, len(data)): + innovation = data[i-1] * data[i-1] + variance = decay * variance + (1 - decay) * innovation + + return np.sqrt(variance) + +def volatility_regime(prices: Sequence[float], + volumes: Sequence[float], + lookback: int = 20) -> Optional[float]: + """识别波动率状态。 + + Args: + prices: 价格序列,从新到旧排序 + volumes: 成交量序列,从新到旧排序 + lookback: 回看期数 + + Returns: + 波动率状态指标(-1到1),或None(数据不足时) + """ + if len(prices) < lookback or len(volumes) < lookback: + return None + + # 计算收益率和成交量变化 + returns = [(prices[i] - prices[i+1]) / prices[i+1] for i in range(lookback-1)] + vol_changes = [(volumes[i] - volumes[i+1]) / volumes[i+1] for i in range(lookback-1)] + + # 计算波动率特征 + vol = np.std(returns) * np.sqrt(252) + vol_of_vol = np.std(vol_changes) + + # 结合价格波动和成交量波动判断状态 + if vol > 0 and vol_of_vol > 0: + regime = 0.5 * (vol / np.mean(abs(returns)) - 1) + \ + 0.5 * (vol_of_vol / np.mean(abs(vol_changes)) - 1) + return np.clip(regime, -1, 1) + return 0.0 + +def range_volatility_prediction(prices: Sequence[float], + window: int = 10) -> Optional[float]: + """基于价格区间的波动率预测。 + + Args: + prices: 价格序列,从新到旧排序 + window: 预测窗口 + + Returns: + 预测的波动率,或None(数据不足时) + """ + if len(prices) < window + 5: # 需要额外的数据来建立预测 + return None + + # 计算历史真实波动率 + ranges = [] + for i in range(len(prices)-window): + price_range = max(prices[i:i+window]) - min(prices[i:i+window]) + ranges.append(price_range / prices[i]) + + if not ranges: + return None + + # 使用历史区间分布预测未来波动率 + pred_vol = np.percentile(ranges, 75) # 使用75分位数作为预测 + return pred_vol + +def volume_price_correlation(prices: Sequence[float], + volumes: Sequence[float], + window: int = 20) -> Optional[float]: + """计算量价相关性。 + + Args: + prices: 价格序列,从新到旧排序 + volumes: 成交量序列,从新到旧排序 + window: 计算窗口 + + Returns: + 量价相关系数(-1到1),或None(数据不足时) + """ + if len(prices) < window + 1 or len(volumes) < window + 1: + return None + + # 计算价格和成交量的变化率 + price_changes = [(prices[i] - prices[i+1]) / prices[i+1] for i in range(window)] + volume_changes = [(volumes[i] - volumes[i+1]) / volumes[i+1] for i in range(window)] + + # 计算相关系数 + if len(price_changes) >= 2: + corr, _ = stats.pearsonr(price_changes, volume_changes) + return corr + return None + +def volume_price_divergence(prices: Sequence[float], + volumes: Sequence[float], + window: int = 10) -> Optional[float]: + """检测量价背离。 + + Args: + prices: 价格序列,从新到旧排序 + volumes: 成交量序列,从新到旧排序 + window: 检测窗口 + + Returns: + 背离强度(-1到1),或None(数据不足时) + """ + if len(prices) < window or len(volumes) < window: + return None + + # 计算价格和成交量趋势 + price_trend = sum(1 if prices[i] > prices[i+1] else -1 for i in range(window-1)) + volume_trend = sum(1 if volumes[i] > volumes[i+1] else -1 for i in range(window-1)) + + # 计算背离程度 + divergence = price_trend * volume_trend * -1 # 反向为背离 + return np.clip(divergence / (window - 1), -1, 1) + +def volume_intensity(volumes: Sequence[float], + prices: Sequence[float], + window: int = 5) -> Optional[float]: + """计算成交强度。 + + Args: + volumes: 成交量序列,从新到旧排序 + prices: 价格序列,从新到旧排序 + window: 计算窗口 + + Returns: + 成交强度指标,或None(数据不足时) + """ + if len(volumes) < window + 1 or len(prices) < window + 1: + return None + + # 计算价格变动 + price_changes = [abs(prices[i] - prices[i+1]) for i in range(window)] + + # 计算成交量加权的价格变动 + weighted_changes = sum(price_changes[i] * volumes[i] for i in range(window)) + total_volume = sum(volumes[:window]) + + if total_volume > 0: + intensity = weighted_changes / (total_volume * np.mean(prices[:window])) + return np.clip(intensity * 100, -100, 100) # 归一化到合理范围 + return None diff --git a/app/features/evaluation.py b/app/features/evaluation.py new file mode 100644 index 0000000..af98d8f --- /dev/null +++ b/app/features/evaluation.py @@ -0,0 +1,209 @@ +"""Factor performance evaluation utilities.""" +from datetime import date, timedelta +from typing import Dict, List, Optional, Sequence, Tuple + +import numpy as np +from scipy import stats + +from app.features.factors import ( + DEFAULT_FACTORS, + FactorResult, + FactorSpec, + compute_factor_range +) +from app.utils.data_access import DataBroker +from app.utils.logging import get_logger + +LOGGER = get_logger(__name__) +LOG_EXTRA = {"stage": "factor_evaluation"} + + +class FactorPerformance: + """因子表现评估结果。""" + + def __init__(self, factor_name: str) -> None: + self.factor_name = factor_name + self.ic_series: List[float] = [] + self.rank_ic_series: List[float] = [] + self.return_spreads: List[float] = [] + self.sharpe_ratio: Optional[float] = None + self.turnover_rate: Optional[float] = None + + @property + def ic_mean(self) -> float: + """平均IC。""" + return np.mean(self.ic_series) if self.ic_series else 0.0 + + @property + def ic_std(self) -> float: + """IC标准差。""" + return np.std(self.ic_series) if self.ic_series else 0.0 + + @property + def ic_ir(self) -> float: + """信息比率。""" + return self.ic_mean / self.ic_std if self.ic_std > 0 else 0.0 + + @property + def rank_ic_mean(self) -> float: + """平均RankIC。""" + return np.mean(self.rank_ic_series) if self.rank_ic_series else 0.0 + + def to_dict(self) -> Dict[str, float]: + """转换为字典格式。""" + return { + "ic_mean": self.ic_mean, + "ic_std": self.ic_std, + "ic_ir": self.ic_ir, + "rank_ic_mean": self.rank_ic_mean, + "sharpe_ratio": self.sharpe_ratio or 0.0, + "turnover_rate": self.turnover_rate or 0.0 + } + + +def evaluate_factor( + factor_name: str, + start_date: date, + end_date: date, + universe: Optional[List[str]] = None, +) -> FactorPerformance: + """评估单个因子的预测能力。 + + Args: + factor_name: 因子名称 + start_date: 起始日期 + end_date: 结束日期 + universe: 可选的股票池 + + Returns: + 因子表现评估结果 + """ + performance = FactorPerformance(factor_name) + + # 计算因子值 + factor_results = compute_factor_range( + start_date, + end_date, + factors=[FactorSpec(factor_name, 0)], + ts_codes=universe + ) + + # 按日期分组 + date_groups: Dict[date, List[FactorResult]] = {} + for result in factor_results: + if result.trade_date not in date_groups: + date_groups[result.trade_date] = [] + date_groups[result.trade_date].append(result) + + # 计算每日IC值和RankIC值 + broker = DataBroker() + for curr_date, results in sorted(date_groups.items()): + next_date = curr_date + timedelta(days=1) + + # 获取因子值和次日收益率 + factor_values = [] + next_returns = [] + + for result in results: + factor_val = result.values.get(factor_name) + if factor_val is None: + continue + + # 获取次日收益率 + next_close = broker.fetch_latest( + result.ts_code, + next_date.strftime("%Y%m%d"), + ["daily.close"] + ).get("daily.close") + + curr_close = broker.fetch_latest( + result.ts_code, + curr_date.strftime("%Y%m%d"), + ["daily.close"] + ).get("daily.close") + + if next_close and curr_close and curr_close > 0: + ret = (next_close - curr_close) / curr_close + factor_values.append(factor_val) + next_returns.append(ret) + + if len(factor_values) >= 20: # 需要足够多的样本 + # 计算IC + ic, _ = stats.pearsonr(factor_values, next_returns) + performance.ic_series.append(ic) + + # 计算RankIC + rank_ic, _ = stats.spearmanr(factor_values, next_returns) + performance.rank_ic_series.append(rank_ic) + + # 计算多空组合收益 + sorted_pairs = sorted(zip(factor_values, next_returns), + key=lambda x: x[0]) + n = len(sorted_pairs) // 5 # 五分位 + if n > 0: + top_returns = [r for _, r in sorted_pairs[-n:]] + bottom_returns = [r for _, r in sorted_pairs[:n]] + spread = np.mean(top_returns) - np.mean(bottom_returns) + performance.return_spreads.append(spread) + + # 计算Sharpe比率 + if performance.return_spreads: + annual_factor = np.sqrt(252) # 交易日数 + returns_mean = np.mean(performance.return_spreads) + returns_std = np.std(performance.return_spreads) + if returns_std > 0: + performance.sharpe_ratio = returns_mean / returns_std * annual_factor + + # 估算换手率 + if factor_results: + dates = sorted(date_groups.keys()) + turnovers = [] + for i in range(1, len(dates)): + prev_results = date_groups[dates[i-1]] + curr_results = date_groups[dates[i]] + + # 计算组合变化 + prev_top = {r.ts_code for r in prev_results + if r.values.get(factor_name, float('-inf')) > np.percentile( + [res.values.get(factor_name, float('-inf')) + for res in prev_results], 80)} + curr_top = {r.ts_code for r in curr_results + if r.values.get(factor_name, float('-inf')) > np.percentile( + [res.values.get(factor_name, float('-inf')) + for res in curr_results], 80)} + + # 计算换手率 + if prev_top and curr_top: + turnover = len(prev_top ^ curr_top) / len(prev_top | curr_top) + turnovers.append(turnover) + + if turnovers: + performance.turnover_rate = np.mean(turnovers) + + return performance + + +def combine_factors( + factor_names: Sequence[str], + weights: Optional[Sequence[float]] = None +) -> FactorSpec: + """组合多个因子。 + + Args: + factor_names: 因子名称列表 + weights: 可选的权重列表,默认等权重 + + Returns: + 组合因子的规格 + """ + if not weights: + weights = [1.0 / len(factor_names)] * len(factor_names) + + name = "combined_" + "_".join(factor_names) + window = max( + spec.window + for spec in DEFAULT_FACTORS + if spec.name in factor_names + ) + + return FactorSpec(name, window) diff --git a/app/features/extended_factors.py b/app/features/extended_factors.py index 62f2f25..f164ec7 100644 --- a/app/features/extended_factors.py +++ b/app/features/extended_factors.py @@ -8,9 +8,21 @@ end-to-end automated decision-making requirements. from __future__ import annotations from dataclasses import dataclass -from typing import Dict, List, Sequence +from typing import Dict, List, Sequence, Optional -from app.core.indicators import momentum, volatility, rolling_mean, normalize +import numpy as np + +from app.core.indicators import momentum, rolling_mean, normalize +from app.core.technical import ( + rsi, macd, bollinger_bands, obv_momentum, price_volume_trend +) +from app.core.momentum import ( + adaptive_momentum, momentum_quality, momentum_regime +) +from app.core.volatility import ( + volatility, garch_volatility, volatility_regime, + volume_price_correlation +) @dataclass @@ -27,169 +39,193 @@ class FactorSpec: # Extended factors focusing on momentum, value, and liquidity signals EXTENDED_FACTORS: List[FactorSpec] = [ + # 技术分析因子 + FactorSpec("tech_rsi_14", 14), # 14日RSI + FactorSpec("tech_macd_signal", 26), # MACD信号 + FactorSpec("tech_bb_position", 20), # 布林带位置 + FactorSpec("tech_obv_momentum", 20), # OBV动量 + FactorSpec("tech_pv_trend", 20), # 价量趋势 + + # 趋势跟踪因子 + FactorSpec("trend_ma_cross", 20), # 均线交叉信号 + FactorSpec("trend_price_channel", 20), # 价格通道突破 + FactorSpec("trend_adx", 14), # 平均趋向指标 + + # 市场微观结构因子 + FactorSpec("micro_tick_direction", 5), # 逐笔方向 + FactorSpec("micro_trade_imbalance", 10), # 交易失衡 + + # 波动率预测因子 + FactorSpec("vol_garch", 20), # GARCH波动率 + FactorSpec("vol_range_pred", 10), # 波动区间预测 + FactorSpec("vol_regime", 20), # 波动率状态 + + # 量价联合因子 + FactorSpec("volume_price_corr", 20), # 量价相关性 + FactorSpec("volume_price_diverge", 10), # 量价背离 + FactorSpec("volume_intensity", 5), # 成交强度 + # 增强动量因子 - FactorSpec("mom_10_30", 0), # 10日与30日动量差 - FactorSpec("mom_5_20_rank", 0), # 相对排名动量因子 - FactorSpec("mom_dynamic", 0), # 动态窗口动量因子 - # 波动率相关因子 - FactorSpec("volat_5", 5), # 短期波动率 - FactorSpec("volat_ratio", 0), # 长短期波动率比率 - # 换手率扩展因子 - FactorSpec("turn_60", 60), # 长期换手率 - FactorSpec("turn_rank", 0), # 换手率相对排名 + FactorSpec("momentum_adaptive", 20), # 自适应动量 + FactorSpec("momentum_regime", 20), # 动量区间 + FactorSpec("momentum_quality", 20), # 动量质量, + # 价格均线比率因子 - FactorSpec("price_ma_10_ratio", 0), # 当前价格与10日均线比率 - FactorSpec("price_ma_20_ratio", 0), # 当前价格与20日均线比率 - FactorSpec("price_ma_60_ratio", 0), # 当前价格与60日均线比率 - # 成交量均线比率因子 - FactorSpec("volume_ma_5_ratio", 0), # 当前成交量与5日均线比率 - FactorSpec("volume_ma_20_ratio", 0), # 当前成交量与20日均线比率 - # 高级估值因子 - FactorSpec("val_ps_score", 0), # PS估值评分 - FactorSpec("val_multiscore", 0), # 综合估值评分 - FactorSpec("val_dividend_score", 0), # 股息率估值评分 - # 市场状态因子 - FactorSpec("market_regime", 0), # 市场状态因子 - FactorSpec("trend_strength", 0), # 趋势强度因子 + FactorSpec("price_ma_10_ratio", 10), # 当前价格与10日均线比率 + FactorSpec("price_ma_20_ratio", 20), # 当前价格与20日均线比率 + FactorSpec("price_ma_60_ratio", 60), # 当前价格与60日均线比率 + + # 成交量均线比率因子 + FactorSpec("volume_ma_5_ratio", 5), # 当前成交量与5日均线比率 + FactorSpec("volume_ma_20_ratio", 20), # 当前成交量与20日均线比率 ] -def compute_extended_factor_values( - close_series: Sequence[float], - volume_series: Sequence[float], - turnover_series: Sequence[float], - latest_fields: Dict[str, float], -) -> Dict[str, float]: - """Compute values for extended factors. +class ExtendedFactors: + """扩展因子计算实现类。 - Args: - close_series: Closing prices series (most recent first) - volume_series: Trading volume series (most recent first) - turnover_series: Turnover rate series (most recent first) - latest_fields: Latest available fields including valuation ratios - - Returns: - Dictionary mapping factor names to computed values + 该类实现了一组用于量化交易的扩展因子计算。包括: + 1. 技术分析因子 (RSI, MACD, 布林带等) + 2. 趋势跟踪因子 (均线交叉等) + 3. 波动率预测因子 (GARCH, 波动率状态等) + 4. 量价联合因子 (量价相关性等) + 5. 动量强化因子 (自适应动量等) + 6. 均线比率因子 (价格/成交量均线比率) + + 使用示例: + calculator = ExtendedFactors() + factor_value = calculator.compute_factor( + "tech_rsi_14", + close_series, + volume_series + ) + all_factors = calculator.compute_all_factors(close_series, volume_series) + normalized = calculator.normalize_factors(all_factors) """ - if not close_series: - return {} + + def __init__(self): + """初始化因子计算器""" + self.factor_specs = {spec.name: spec for spec in EXTENDED_FACTORS} - results: Dict[str, float] = {} - - # 增强动量因子 - # 10日与30日动量差 - if len(close_series) >= 30: - mom_10 = momentum(close_series, 10) - mom_30 = momentum(close_series, 30) - if mom_10 is not None and mom_30 is not None: - results["mom_10_30"] = mom_10 - mom_30 - - # 相对排名动量因子 - # 这里需要市场数据来计算相对排名,暂时使用简化版本 - if len(close_series) >= 20: - mom_20 = momentum(close_series, 20) - if mom_20 is not None: - # 简化处理:将动量标准化 - results["mom_5_20_rank"] = min(1.0, max(0.0, (mom_20 + 0.2) / 0.4)) - - # 动态窗口动量因子 - # 根据波动率动态调整窗口 - if len(close_series) >= 20: - volat_20 = volatility(close_series, 20) - mom_20 = momentum(close_series, 20) - if volat_20 is not None and mom_20 is not None and volat_20 > 0: - # 波动率调整后的动量 - results["mom_dynamic"] = mom_20 / volat_20 - - # 波动率相关因子 - # 短期波动率 - if len(close_series) >= 5: - results["volat_5"] = volatility(close_series, 5) - - # 长短期波动率比率 - if len(close_series) >= 20 and len(close_series) >= 5: - volat_5 = volatility(close_series, 5) - volat_20 = volatility(close_series, 20) - if volat_5 is not None and volat_20 is not None and volat_20 > 0: - results["volat_ratio"] = volat_5 / volat_20 - - # 换手率扩展因子 - # 长期换手率 - if len(turnover_series) >= 60: - results["turn_60"] = rolling_mean(turnover_series, 60) - - # 换手率相对排名 - if len(turnover_series) >= 20: - turn_20 = rolling_mean(turnover_series, 20) - if turn_20 is not None: - # 简化处理:将换手率标准化 - results["turn_rank"] = min(1.0, max(0.0, turn_20 / 5.0)) # 假设5%为高换手率 - - # 价格均线比率因子 - if len(close_series) >= 10: - ma_10 = rolling_mean(close_series, 10) - if ma_10 is not None and ma_10 > 0: - results["price_ma_10_ratio"] = close_series[0] / ma_10 - - if len(close_series) >= 20: - ma_20 = rolling_mean(close_series, 20) - if ma_20 is not None and ma_20 > 0: - results["price_ma_20_ratio"] = close_series[0] / ma_20 - - if len(close_series) >= 60: - ma_60 = rolling_mean(close_series, 60) - if ma_60 is not None and ma_60 > 0: - results["price_ma_60_ratio"] = close_series[0] / ma_60 - - # 成交量均线比率因子 - if len(volume_series) >= 5: - vol_ma_5 = rolling_mean(volume_series, 5) - if vol_ma_5 is not None and vol_ma_5 > 0: - results["volume_ma_5_ratio"] = volume_series[0] / vol_ma_5 - - if len(volume_series) >= 20: - vol_ma_20 = rolling_mean(volume_series, 20) - if vol_ma_20 is not None and vol_ma_20 > 0: - results["volume_ma_20_ratio"] = volume_series[0] / vol_ma_20 - - # 高级估值因子 - ps = latest_fields.get("daily_basic.ps") - if ps is not None and ps > 0: - # PS估值评分 - results["val_ps_score"] = 2.5 / (2.5 + ps) # 使用2.5作为scale参数 - - pe = latest_fields.get("daily_basic.pe") - pb = latest_fields.get("daily_basic.pb") - # 综合估值评分 - scores = [] - if pe is not None and pe > 0: - scores.append(12.0 / (12.0 + pe)) # PE评分 - if pb is not None and pb > 0: - scores.append(2.5 / (2.5 + pb)) # PB评分 - if ps is not None and ps > 0: - scores.append(2.5 / (2.5 + ps)) # PS评分 - - if scores: - results["val_multiscore"] = sum(scores) / len(scores) - - dv_ratio = latest_fields.get("daily_basic.dv_ratio") - if dv_ratio is not None: - # 股息率估值评分 - results["val_dividend_score"] = min(1.0, max(0.0, dv_ratio / 5.0)) # 假设5%为高股息率 - - # 市场状态因子 - # 简单的市场状态指标:基于价格位置 - if len(close_series) >= 60: - ma_60 = rolling_mean(close_series, 60) - if ma_60 is not None and ma_60 > 0: - results["market_regime"] = close_series[0] / ma_60 - - # 趋势强度因子 - if len(close_series) >= 20: - mom_20 = momentum(close_series, 20) - volat_20 = volatility(close_series, 20) - if mom_20 is not None and volat_20 is not None and volat_20 > 0: - # 趋势强度:动量与波动率的比率 - results["trend_strength"] = abs(mom_20) / volat_20 - - return results \ No newline at end of file + def compute_factor(self, + factor_name: str, + close_series: Sequence[float], + volume_series: Sequence[float]) -> Optional[float]: + """计算单个因子值 + + Args: + factor_name: 因子名称 + close_series: 收盘价序列,从新到旧排序 + volume_series: 成交量序列,从新到旧排序 + + Returns: + 因子值,如果计算失败则返回None + """ + try: + spec = self.factor_specs.get(factor_name) + if spec is None: + print(f"Unknown factor: {factor_name}") + return None + + if len(close_series) < spec.window: + return None + + # 技术分析因子 + if factor_name == "tech_rsi_14": + return rsi(close_series, 14) + + elif factor_name == "tech_macd_signal": + _, signal = macd(close_series) + return signal + + elif factor_name == "tech_bb_position": + upper, lower = bollinger_bands(close_series, 20) + pos = (close_series[0] - lower) / (upper - lower + 1e-8) + return pos + + elif factor_name == "tech_obv_momentum": + return obv_momentum(close_series, volume_series, 20) + + elif factor_name == "tech_pv_trend": + return price_volume_trend(close_series, volume_series, 20) + + # 趋势跟踪因子 + elif factor_name == "trend_ma_cross": + ma_5 = rolling_mean(close_series, 5) + ma_20 = rolling_mean(close_series, 20) + return ma_5 - ma_20 + + # 波动率预测因子 + elif factor_name == "vol_garch": + return garch_volatility(close_series, 20) + + elif factor_name == "vol_regime": + regime, _ = volatility_regime(close_series, volume_series, 20) + return regime + + # 量价联合因子 + elif factor_name == "volume_price_corr": + return volume_price_correlation(close_series, volume_series, 20) + + # 增强动量因子 + elif factor_name == "momentum_adaptive": + return adaptive_momentum(close_series, volume_series, 20) + + elif factor_name == "momentum_regime": + return momentum_regime(close_series, volume_series, 20) + + elif factor_name == "momentum_quality": + return momentum_quality(close_series, 20) + + # 均线比率因子 + elif factor_name.endswith("_ratio"): + if "price_ma" in factor_name: + window = int(factor_name.split("_")[2]) + ma = rolling_mean(close_series, window) + return close_series[0] / ma if ma > 0 else None + + elif "volume_ma" in factor_name: + window = int(factor_name.split("_")[2]) + ma = rolling_mean(volume_series, window) + return volume_series[0] / ma if ma > 0 else None + + return None + + except Exception as e: + print(f"Error computing factor {factor_name}: {str(e)}") + return None + + def compute_all_factors(self, + close_series: Sequence[float], + volume_series: Sequence[float]) -> Dict[str, float]: + """计算所有扩展因子值 + + Args: + close_series: 收盘价序列,从新到旧排序 + volume_series: 成交量序列,从新到旧排序 + + Returns: + 因子名称到因子值的映射字典 + """ + results = {} + + for factor_name in self.factor_specs: + value = self.compute_factor(factor_name, close_series, volume_series) + if value is not None: + results[factor_name] = value + + return results + def normalize_factors(self, factors: Dict[str, float]) -> Dict[str, float]: + """标准化因子值到[-1,1]区间 + + Args: + factors: 原始因子值字典 + + Returns: + 标准化后的因子值字典 + """ + results = {} + for name, value in factors.items(): + if value is not None: + results[name] = normalize(value) + return results \ No newline at end of file diff --git a/app/features/factors.py b/app/features/factors.py index 9130c8f..9bccd19 100644 --- a/app/features/factors.py +++ b/app/features/factors.py @@ -2,6 +2,7 @@ from __future__ import annotations import re +import sqlite3 from dataclasses import dataclass from datetime import datetime, date, timezone from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple, Union @@ -12,7 +13,9 @@ from app.utils.data_access import DataBroker from app.utils.db import db_session from app.utils.logging import get_logger # 导入扩展因子模块 -from app.features.extended_factors import compute_extended_factor_values +from app.features.extended_factors import ExtendedFactors +# 导入因子验证功能 +from app.features.validation import check_data_sufficiency, detect_outliers LOGGER = get_logger(__name__) @@ -294,9 +297,12 @@ def _compute_batch_factors( if values: # 检测并处理异常值 - values = _detect_and_handle_outliers(values, ts_code) - batch_results.append((ts_code, values)) - validation_stats["success"] += 1 + cleaned_values = detect_outliers(values, ts_code, trade_date) + if cleaned_values: + batch_results.append((ts_code, cleaned_values)) + validation_stats["success"] += 1 + else: + validation_stats["outliers"] += 1 else: validation_stats["skipped"] += 1 except Exception as e: @@ -318,21 +324,42 @@ def _check_data_availability( specs: Sequence[FactorSpec], ) -> bool: """检查证券数据是否足够计算所有请求的因子""" - # 获取最小需要的数据天数 - min_days = 1 # 至少需要当天的数据 - for spec in specs: - if spec.window > min_days: - min_days = spec.window + # 检查所需的最大窗口 + close_windows = [spec.window for spec in specs if spec.name.startswith(("mom_", "volat_"))] + turnover_windows = [spec.window for spec in specs if spec.name.startswith("turn_")] + max_close_window = max(close_windows) if close_windows else 0 + max_turn_window = max(turnover_windows) if turnover_windows else 0 - # 检查基本数据是否存在 - basic_data = broker.fetch_latest( + # 获取时间序列数据 + close_series = _fetch_series_values(broker, "daily", "close", ts_code, trade_date, max_close_window) + if max_close_window > 0 and not check_data_sufficiency( + close_series, max_close_window, "close", ts_code, trade_date + ): + return False + + turnover_series = _fetch_series_values( + broker, "daily_basic", "turnover_rate", ts_code, trade_date, max_turn_window + ) + if max_turn_window > 0 and not check_data_sufficiency( + turnover_series, max_turn_window, "turnover_rate", ts_code, trade_date + ): + return False + + # 检查快照数据 + latest_fields = broker.fetch_latest( ts_code, trade_date, - ["daily.close", "daily_basic.turnover_rate"], + ["daily.close", "daily_basic.turnover_rate", "daily_basic.pe", "daily_basic.pb"] ) - - # 检查时间序列数据是否足够 - close_check = broker.fetch_series("daily", "close", ts_code, trade_date, min_days) + required_fields = {"daily.close", "daily_basic.turnover_rate"} + for field in required_fields: + if latest_fields.get(field) is None: + LOGGER.warning( + "缺少必需字段 field=%s ts_code=%s date=%s", + field, ts_code, trade_date, + extra=LOG_EXTRA + ) + return False return ( bool(basic_data.get("daily.close")) and @@ -485,9 +512,8 @@ def _compute_security_factors( ) # 计算扩展因子值 - extended_factors = compute_extended_factor_values( - close_series, volume_series, turnover_series, latest_fields - ) + calculator = ExtendedFactors() + extended_factors = calculator.compute_all_factors(close_series, volume_series) results.update(extended_factors) # 确保返回结果不为空 diff --git a/app/features/validation.py b/app/features/validation.py new file mode 100644 index 0000000..ef1d069 --- /dev/null +++ b/app/features/validation.py @@ -0,0 +1,169 @@ +"""Factor validation and quality control utilities.""" +from __future__ import annotations + +from typing import Dict, Optional, Sequence +import numpy as np +from app.utils.logging import get_logger + +LOGGER = get_logger(__name__) +LOG_EXTRA = {"stage": "factor_validation"} + +# 因子值范围限制配置 +FACTOR_LIMITS = { + # 动量类因子限制在 ±50% + "mom_": (-0.5, 0.5), + # 波动率类因子限制在 0-30% + "volat_": (0, 0.3), + # 换手率类因子限制在 0-100% + "turn_": (0, 1.0), + # 估值评分类因子限制在 0-1 + "val_": (0, 1.0), + # 量价类因子 + "volume_": (0, 5.0), + # 市场状态类因子 + "market_": (-1.0, 1.0), +} + +def validate_factor_value( + name: str, + value: float, + ts_code: str, + trade_date: str +) -> Optional[float]: + """验证单个因子值是否在合理范围内。 + + Args: + name: 因子名称 + value: 因子值 + ts_code: 股票代码 + trade_date: 交易日期 + + Returns: + 如果因子值有效则返回原值,否则返回 None + """ + if value is None: + return None + + # 检查是否为有限数值 + if not np.isfinite(value): + LOGGER.warning( + "因子值非有限数值 factor=%s value=%f ts_code=%s date=%s", + name, value, ts_code, trade_date, + extra=LOG_EXTRA + ) + return None + + # 根据因子类型应用不同的限制 + for prefix, (min_val, max_val) in FACTOR_LIMITS.items(): + if name.startswith(prefix): + if value < min_val or value > max_val: + LOGGER.warning( + "因子值超出范围 factor=%s value=%f range=[%f,%f] ts_code=%s date=%s", + name, value, min_val, max_val, ts_code, trade_date, + extra=LOG_EXTRA + ) + return None + break + + return value + +def detect_outliers( + values: Dict[str, float], + ts_code: str, + trade_date: str +) -> Dict[str, float]: + """检测和处理因子值中的异常值。 + + Args: + values: 因子值字典 + ts_code: 股票代码 + trade_date: 交易日期 + + Returns: + 处理后的因子值字典 + """ + result = {} + + for name, value in values.items(): + validated = validate_factor_value(name, value, ts_code, trade_date) + if validated is not None: + result[name] = validated + + return result + +def check_data_sufficiency( + ts_code: str, + trade_date: str, + min_days: int = 60 +) -> bool: + """验证因子计算所需数据是否充分。 + + Args: + ts_code: 股票代码 + trade_date: 交易日期 + min_days: 最少需要的历史数据天数 + + Returns: + 数据是否充分 + """ + from app.utils.data_access import DataBroker + + # 检查历史收盘价数据 + close_series = DataBroker.get_daily_price(ts_code, end_date=trade_date) + if len(close_series) < min_days: + LOGGER.warning( + "历史数据不足 ts_code=%s date=%s min_days=%d actual=%d", + ts_code, trade_date, min_days, len(close_series), + extra=LOG_EXTRA + ) + return False + + # 检查日期点数据完整性 + latest_fields = DataBroker.get_latest_fields( + ts_code, + trade_date, + ["daily.close", "daily_basic.turnover_rate", "daily_basic.pe", "daily_basic.pb"] + ) + required_fields = {"daily.close", "daily_basic.turnover_rate"} + + for field in required_fields: + if latest_fields.get(field) is None: + LOGGER.warning( + "缺少必需字段 field=%s ts_code=%s date=%s", + field, ts_code, trade_date, + extra=LOG_EXTRA + ) + return False + + return True + """检查数据序列是否满足计算要求。 + + Args: + data: 数据序列 + required_length: 所需最小长度 + field_name: 字段名称 + ts_code: 股票代码 + trade_date: 交易日期 + + Returns: + 数据是否足够 + """ + if len(data) < required_length: + LOGGER.warning( + "数据长度不足 field=%s required=%d actual=%d ts_code=%s date=%s", + field_name, required_length, len(data), ts_code, trade_date, + extra=LOG_EXTRA + ) + return False + + # 检查数据有效性 + valid_count = sum(1 for x in data if x is not None and np.isfinite(x)) + if valid_count < required_length: + LOGGER.warning( + "有效数据不足 field=%s required=%d valid=%d ts_code=%s date=%s", + field_name, required_length, valid_count, ts_code, trade_date, + extra=LOG_EXTRA + ) + return False + + return True diff --git a/requirements.txt b/requirements.txt index f574244..333df38 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,5 @@ requests>=2.31 python-box>=7.0 pytest>=7.0 feedparser>=6.0 +arch>=6.1.0 +scipy>=1.11.0 diff --git a/tests/test_extended_factors.py b/tests/test_extended_factors.py new file mode 100644 index 0000000..e14f99c --- /dev/null +++ b/tests/test_extended_factors.py @@ -0,0 +1,128 @@ +"""Test extended factor calculations.""" + +import numpy as np +import pytest + +from app.features.extended_factors import ExtendedFactors, FactorSpec + + +def test_technical_factors(): + """测试技术分析因子计算。""" + + # 准备测试数据 + close = np.array([100, 102, 98, 103, 97, 105, 102, 98, 103, 99, + 100, 102, 98, 103, 97, 105, 102, 98, 103, 99]) + volume = np.array([1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700, + 1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700]) + + factors = ExtendedFactors() + + # 测试RSI计算 + rsi_spec = FactorSpec("tech_rsi_14", 14) + rsi = factors.compute_factor(rsi_spec, close, volume) + assert rsi is not None + assert 0 <= rsi <= 100 + + # 测试MACD计算 + macd_spec = FactorSpec("tech_macd_signal", 26) + macd = factors.compute_factor(macd_spec, close, volume) + assert macd is not None + + # 测试布林带位置 + bb_spec = FactorSpec("tech_bb_position", 20) + bb = factors.compute_factor(bb_spec, close, volume) + assert bb is not None + assert 0 <= bb <= 1 + + # 测试OBV动量 + obv_spec = FactorSpec("tech_obv_momentum", 20) + obv = factors.compute_factor(obv_spec, close, volume) + assert obv is not None + +def test_momentum_factors(): + """测试动量因子计算。""" + + # 准备测试数据 + close = np.array([100, 102, 98, 103, 97, 105, 102, 98, 103, 99, + 100, 102, 98, 103, 97, 105, 102, 98, 103, 99]) + volume = np.array([1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700, + 1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700]) + + factors = ExtendedFactors() + + # 测试自适应动量 + adaptive_spec = FactorSpec("momentum_adaptive", 20) + adaptive = factors.compute_factor(adaptive_spec, close, volume) + assert adaptive is not None + + # 测试动量质量 + quality_spec = FactorSpec("momentum_quality", 20) + quality = factors.compute_factor(quality_spec, close, volume) + assert quality is not None + assert -1 <= quality <= 1 + + # 测试动量状态 + regime_spec = FactorSpec("momentum_regime", 20) + regime = factors.compute_factor(regime_spec, close, volume) + assert regime is not None + assert -1 <= regime <= 1 + +def test_volatility_factors(): + """测试波动率因子计算。""" + + # 准备测试数据 + close = np.array([100, 102, 98, 103, 97, 105, 102, 98, 103, 99, + 100, 102, 98, 103, 97, 105, 102, 98, 103, 99]) + volume = np.array([1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700, + 1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700]) + + factors = ExtendedFactors() + + # 测试GARCH波动率 + garch_spec = FactorSpec("vol_garch", 20) + garch = factors.compute_factor(garch_spec, close, volume) + assert garch is not None + assert garch >= 0 + + # 测试波动率状态 + regime_spec = FactorSpec("vol_regime", 20) + regime = factors.compute_factor(regime_spec, close, volume) + assert regime is not None + +def test_compute_all_factors(): + """测试计算所有因子。""" + + # 准备测试数据 + close = np.array([100, 102, 98, 103, 97, 105, 102, 98, 103, 99, + 100, 102, 98, 103, 97, 105, 102, 98, 103, 99]) + volume = np.array([1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700, + 1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700]) + + factors = ExtendedFactors() + + # 计算所有因子 + results = factors.compute_all_factors(close, volume) + + # 验证结果 + assert len(results) > 0 + for value in results.values(): + assert isinstance(value, (int, float)) + assert not np.isnan(value) + +def test_data_validation(): + """测试数据验证。""" + + # 构造无效数据 + close = np.array([100, np.nan, 98, 103]) # 包含NaN + volume = np.array([1000, 1200, -800, 1500]) # 包含负值 + + factors = ExtendedFactors() + + # 测试单个因子计算 + rsi_spec = FactorSpec("tech_rsi_14", 14) + rsi = factors.compute_factor(rsi_spec, close, volume) + assert rsi is None # 数据无效应返回None + + # 测试整体计算 + results = factors.compute_all_factors(close, volume) + assert len(results) == 0 # 无效数据应返回空字典