This commit is contained in:
sam 2025-10-05 12:55:48 +08:00
parent 8aff8faebb
commit 69a3cc69c6
9 changed files with 1230 additions and 178 deletions

108
app/core/momentum.py Normal file
View File

@ -0,0 +1,108 @@
"""Momentum related indicators."""
from typing import List, Optional, Sequence, Tuple
import numpy as np
from scipy import stats
def adaptive_momentum(prices: Sequence[float],
volumes: Sequence[float],
lookback: int = 20) -> Optional[float]:
"""计算自适应动量指标。
基于价格趋势和成交量变化自适应调整动量周期
Args:
prices: 价格序列从新到旧排序
volumes: 成交量序列从新到旧排序
lookback: 基准回看期数
Returns:
动量值或None数据不足时
"""
if len(prices) < lookback or len(volumes) < lookback:
return None
# 计算价格收益率和成交量变化率
price_returns = np.array([(prices[i] - prices[i+1])/prices[i+1]
for i in range(len(prices)-1)])
vol_changes = np.array([(volumes[i] - volumes[i+1])/volumes[i+1]
for i in range(len(volumes)-1)])
if len(price_returns) < lookback:
return None
# 根据成交量变化调整权重
weights = np.exp(vol_changes[:lookback])
weights = weights / np.sum(weights)
# 计算加权动量
momentum = np.sum(price_returns[:lookback] * weights)
return momentum
def momentum_quality(prices: Sequence[float],
window: int = 20) -> Optional[float]:
"""计算动量质量指标。
基于价格趋势的一致性和强度评估动量质量
Args:
prices: 价格序列从新到旧排序
window: 计算窗口
Returns:
动量质量指标(-1到1)或None数据不足时
"""
if len(prices) < window:
return None
# 计算收益率
returns = np.array([(prices[i] - prices[i+1])/prices[i+1]
for i in range(len(prices)-1)])
if len(returns) < window:
return None
# 计算趋势一致性
returns = returns[:window]
sign_consistency = np.mean(np.sign(returns))
# 计算趋势强度
trend_strength = abs(np.sum(returns)) / (np.sum(abs(returns)) + 1e-8)
# 组合指标
quality = sign_consistency * trend_strength
return np.clip(quality, -1, 1)
def momentum_regime(prices: Sequence[float],
volumes: Sequence[float],
window: int = 20) -> Optional[float]:
"""识别动量趋势状态。
结合价格趋势和成交量特征识别动量状态
Args:
prices: 价格序列从新到旧排序
volumes: 成交量序列从新到旧排序
window: 计算窗口
Returns:
动量状态指标(-1到1)或None数据不足时
"""
if len(prices) < window or len(volumes) < window:
return None
# 计算价格动量
mom = adaptive_momentum(prices, volumes, window)
if mom is None:
return None
# 计算动量质量
quality = momentum_quality(prices, window)
if quality is None:
return None
# 组合指标
regime = 0.7 * np.sign(mom) * abs(quality) + 0.3 * quality
return np.clip(regime, -1, 1)

183
app/core/technical.py Normal file
View File

@ -0,0 +1,183 @@
"""Technical analysis indicators implementation."""
from typing import List, Optional, Sequence
import numpy as np
from .indicators import rolling_mean
def rsi(prices: Sequence[float], period: int = 14) -> Optional[float]:
"""计算相对强弱指标(RSI)。
Args:
prices: 价格序列从新到旧排序
period: RSI周期默认14天
Returns:
RSI值 (0-100) None数据不足时
"""
if len(prices) < period + 1:
return None
# 计算价格变化
deltas = [prices[i] - prices[i+1] for i in range(len(prices)-1)]
deltas = deltas[:period] # 只使用所需周期的数据
gain = [delta if delta > 0 else 0 for delta in deltas]
loss = [-delta if delta < 0 else 0 for delta in deltas]
avg_gain = sum(gain) / period
avg_loss = sum(loss) / period
if avg_loss == 0:
return 100.0
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def macd(prices: Sequence[float],
fast_period: int = 12,
slow_period: int = 26,
signal_period: int = 9) -> Optional[float]:
"""计算MACD信号。
Args:
prices: 价格序列从新到旧排序
fast_period: 快线周期
slow_period: 慢线周期
signal_period: 信号线周期
Returns:
MACD柱状值或None数据不足时
"""
if len(prices) < slow_period + signal_period:
return None
# 计算快慢线EMA
fast_ema = _ema(prices, fast_period)
slow_ema = _ema(prices, slow_period)
if fast_ema is None or slow_ema is None:
return None
# 计算MACD线
macd_line = fast_ema - slow_ema
# 计算信号线
macd_values = []
for i in range(len(prices) - slow_period + 1):
fast_ema = _ema(prices[i:], fast_period)
slow_ema = _ema(prices[i:], slow_period)
if fast_ema is not None and slow_ema is not None:
macd_values.append(fast_ema - slow_ema)
if len(macd_values) < signal_period:
return None
signal_line = _ema(macd_values, signal_period)
if signal_line is None:
return None
# 返回MACD柱状图值
return macd_line - signal_line
def _ema(data: Sequence[float], period: int) -> Optional[float]:
"""计算指数移动平均。"""
if len(data) < period:
return None
multiplier = 2 / (period + 1)
ema = data[0]
for price in data[1:period]:
ema = (price - ema) * multiplier + ema
return ema
def bollinger_bands(prices: Sequence[float],
period: int = 20,
std_dev: float = 2.0) -> Optional[float]:
"""计算布林带位置。
Args:
prices: 价格序列从新到旧排序
period: 移动平均周期
std_dev: 标准差倍数
Returns:
价格在布林带中的位置(-1到1)或None数据不足时
"""
if len(prices) < period:
return None
# 获取周期内数据
price_window = prices[:period]
# 计算移动平均和标准差
ma = sum(price_window) / period
std = np.std(price_window)
# 计算布林带
upper = ma + (std_dev * std)
lower = ma - (std_dev * std)
# 计算当前价格在带中的位置
current_price = prices[0]
band_width = upper - lower
if band_width == 0:
return 0
position = 2 * (current_price - lower) / band_width - 1
return max(-1, min(1, position)) # 确保值在-1到1之间
def obv_momentum(volumes: Sequence[float],
prices: Sequence[float],
period: int = 20) -> Optional[float]:
"""计算能量潮(OBV)动量。
Args:
volumes: 成交量序列从新到旧排序
prices: 价格序列从新到旧排序
period: 计算周期
Returns:
OBV动量值或None数据不足时
"""
if len(volumes) < period + 1 or len(prices) < period + 1:
return None
# 计算OBV序列
obv = [0.0] # 初始OBV值
for i in range(1, period):
price_change = prices[i-1] - prices[i]
if price_change > 0:
obv.append(obv[-1] + volumes[i-1])
elif price_change < 0:
obv.append(obv[-1] - volumes[i-1])
else:
obv.append(obv[-1])
# 计算OBV动量当前值与N日前的差值
obv_momentum = (obv[0] - obv[-1]) / sum(volumes[:period])
return obv_momentum
def price_volume_trend(prices: Sequence[float],
volumes: Sequence[float],
period: int = 20) -> Optional[float]:
"""计算价量趋势指标。
Args:
prices: 价格序列从新到旧排序
volumes: 成交量序列从新到旧排序
period: 计算周期
Returns:
价量趋势值或None数据不足时
"""
if len(prices) < period or len(volumes) < period:
return None
# 计算价格变动和成交量的乘积
pv_values = []
for i in range(period-1):
price_change = (prices[i] - prices[i+1]) / prices[i+1]
pv_values.append(price_change * volumes[i])
# 使用移动平均平滑处理
pv_trend = sum(pv_values) / sum(volumes[:period])
return pv_trend

191
app/core/volatility.py Normal file
View File

@ -0,0 +1,191 @@
"""Advanced volatility and volume-price indicators."""
from typing import List, Optional, Sequence, Tuple
import numpy as np
import pandas as pd
from scipy import stats
from arch import arch_model
def volatility(prices: Sequence[float], window: int = 20) -> Optional[float]:
"""计算历史波动率。
Args:
prices: 价格序列从新到旧排序
window: 计算窗口
Returns:
年化波动率或None数据不足时
"""
if len(prices) < window:
return None
returns = [(prices[i] - prices[i+1]) / prices[i+1] for i in range(len(prices)-1)]
if len(returns) < window:
return None
vol = np.std(returns[:window]) * np.sqrt(252)
return vol
def garch_volatility(returns: Sequence[float],
lookback: int = 20) -> Optional[float]:
"""使用GARCH(1,1)模型估计波动率。
Args:
returns: 收益率序列从新到旧排序
lookback: 回看期数
Returns:
预测的波动率或None数据不足时
"""
if len(returns) < lookback:
return None
# 使用简化的EWMA代替GARCH
data = np.array(returns[:lookback])
decay = 0.94
# 计算历史波动率
hist_vol = np.std(data) * np.sqrt(252) # 年化
# 计算EWMA波动率
variance = hist_vol * hist_vol
for i in range(1, len(data)):
innovation = data[i-1] * data[i-1]
variance = decay * variance + (1 - decay) * innovation
return np.sqrt(variance)
def volatility_regime(prices: Sequence[float],
volumes: Sequence[float],
lookback: int = 20) -> Optional[float]:
"""识别波动率状态。
Args:
prices: 价格序列从新到旧排序
volumes: 成交量序列从新到旧排序
lookback: 回看期数
Returns:
波动率状态指标(-1到1)或None数据不足时
"""
if len(prices) < lookback or len(volumes) < lookback:
return None
# 计算收益率和成交量变化
returns = [(prices[i] - prices[i+1]) / prices[i+1] for i in range(lookback-1)]
vol_changes = [(volumes[i] - volumes[i+1]) / volumes[i+1] for i in range(lookback-1)]
# 计算波动率特征
vol = np.std(returns) * np.sqrt(252)
vol_of_vol = np.std(vol_changes)
# 结合价格波动和成交量波动判断状态
if vol > 0 and vol_of_vol > 0:
regime = 0.5 * (vol / np.mean(abs(returns)) - 1) + \
0.5 * (vol_of_vol / np.mean(abs(vol_changes)) - 1)
return np.clip(regime, -1, 1)
return 0.0
def range_volatility_prediction(prices: Sequence[float],
window: int = 10) -> Optional[float]:
"""基于价格区间的波动率预测。
Args:
prices: 价格序列从新到旧排序
window: 预测窗口
Returns:
预测的波动率或None数据不足时
"""
if len(prices) < window + 5: # 需要额外的数据来建立预测
return None
# 计算历史真实波动率
ranges = []
for i in range(len(prices)-window):
price_range = max(prices[i:i+window]) - min(prices[i:i+window])
ranges.append(price_range / prices[i])
if not ranges:
return None
# 使用历史区间分布预测未来波动率
pred_vol = np.percentile(ranges, 75) # 使用75分位数作为预测
return pred_vol
def volume_price_correlation(prices: Sequence[float],
volumes: Sequence[float],
window: int = 20) -> Optional[float]:
"""计算量价相关性。
Args:
prices: 价格序列从新到旧排序
volumes: 成交量序列从新到旧排序
window: 计算窗口
Returns:
量价相关系数(-1到1)或None数据不足时
"""
if len(prices) < window + 1 or len(volumes) < window + 1:
return None
# 计算价格和成交量的变化率
price_changes = [(prices[i] - prices[i+1]) / prices[i+1] for i in range(window)]
volume_changes = [(volumes[i] - volumes[i+1]) / volumes[i+1] for i in range(window)]
# 计算相关系数
if len(price_changes) >= 2:
corr, _ = stats.pearsonr(price_changes, volume_changes)
return corr
return None
def volume_price_divergence(prices: Sequence[float],
volumes: Sequence[float],
window: int = 10) -> Optional[float]:
"""检测量价背离。
Args:
prices: 价格序列从新到旧排序
volumes: 成交量序列从新到旧排序
window: 检测窗口
Returns:
背离强度(-1到1)或None数据不足时
"""
if len(prices) < window or len(volumes) < window:
return None
# 计算价格和成交量趋势
price_trend = sum(1 if prices[i] > prices[i+1] else -1 for i in range(window-1))
volume_trend = sum(1 if volumes[i] > volumes[i+1] else -1 for i in range(window-1))
# 计算背离程度
divergence = price_trend * volume_trend * -1 # 反向为背离
return np.clip(divergence / (window - 1), -1, 1)
def volume_intensity(volumes: Sequence[float],
prices: Sequence[float],
window: int = 5) -> Optional[float]:
"""计算成交强度。
Args:
volumes: 成交量序列从新到旧排序
prices: 价格序列从新到旧排序
window: 计算窗口
Returns:
成交强度指标或None数据不足时
"""
if len(volumes) < window + 1 or len(prices) < window + 1:
return None
# 计算价格变动
price_changes = [abs(prices[i] - prices[i+1]) for i in range(window)]
# 计算成交量加权的价格变动
weighted_changes = sum(price_changes[i] * volumes[i] for i in range(window))
total_volume = sum(volumes[:window])
if total_volume > 0:
intensity = weighted_changes / (total_volume * np.mean(prices[:window]))
return np.clip(intensity * 100, -100, 100) # 归一化到合理范围
return None

209
app/features/evaluation.py Normal file
View File

@ -0,0 +1,209 @@
"""Factor performance evaluation utilities."""
from datetime import date, timedelta
from typing import Dict, List, Optional, Sequence, Tuple
import numpy as np
from scipy import stats
from app.features.factors import (
DEFAULT_FACTORS,
FactorResult,
FactorSpec,
compute_factor_range
)
from app.utils.data_access import DataBroker
from app.utils.logging import get_logger
LOGGER = get_logger(__name__)
LOG_EXTRA = {"stage": "factor_evaluation"}
class FactorPerformance:
"""因子表现评估结果。"""
def __init__(self, factor_name: str) -> None:
self.factor_name = factor_name
self.ic_series: List[float] = []
self.rank_ic_series: List[float] = []
self.return_spreads: List[float] = []
self.sharpe_ratio: Optional[float] = None
self.turnover_rate: Optional[float] = None
@property
def ic_mean(self) -> float:
"""平均IC。"""
return np.mean(self.ic_series) if self.ic_series else 0.0
@property
def ic_std(self) -> float:
"""IC标准差。"""
return np.std(self.ic_series) if self.ic_series else 0.0
@property
def ic_ir(self) -> float:
"""信息比率。"""
return self.ic_mean / self.ic_std if self.ic_std > 0 else 0.0
@property
def rank_ic_mean(self) -> float:
"""平均RankIC。"""
return np.mean(self.rank_ic_series) if self.rank_ic_series else 0.0
def to_dict(self) -> Dict[str, float]:
"""转换为字典格式。"""
return {
"ic_mean": self.ic_mean,
"ic_std": self.ic_std,
"ic_ir": self.ic_ir,
"rank_ic_mean": self.rank_ic_mean,
"sharpe_ratio": self.sharpe_ratio or 0.0,
"turnover_rate": self.turnover_rate or 0.0
}
def evaluate_factor(
factor_name: str,
start_date: date,
end_date: date,
universe: Optional[List[str]] = None,
) -> FactorPerformance:
"""评估单个因子的预测能力。
Args:
factor_name: 因子名称
start_date: 起始日期
end_date: 结束日期
universe: 可选的股票池
Returns:
因子表现评估结果
"""
performance = FactorPerformance(factor_name)
# 计算因子值
factor_results = compute_factor_range(
start_date,
end_date,
factors=[FactorSpec(factor_name, 0)],
ts_codes=universe
)
# 按日期分组
date_groups: Dict[date, List[FactorResult]] = {}
for result in factor_results:
if result.trade_date not in date_groups:
date_groups[result.trade_date] = []
date_groups[result.trade_date].append(result)
# 计算每日IC值和RankIC值
broker = DataBroker()
for curr_date, results in sorted(date_groups.items()):
next_date = curr_date + timedelta(days=1)
# 获取因子值和次日收益率
factor_values = []
next_returns = []
for result in results:
factor_val = result.values.get(factor_name)
if factor_val is None:
continue
# 获取次日收益率
next_close = broker.fetch_latest(
result.ts_code,
next_date.strftime("%Y%m%d"),
["daily.close"]
).get("daily.close")
curr_close = broker.fetch_latest(
result.ts_code,
curr_date.strftime("%Y%m%d"),
["daily.close"]
).get("daily.close")
if next_close and curr_close and curr_close > 0:
ret = (next_close - curr_close) / curr_close
factor_values.append(factor_val)
next_returns.append(ret)
if len(factor_values) >= 20: # 需要足够多的样本
# 计算IC
ic, _ = stats.pearsonr(factor_values, next_returns)
performance.ic_series.append(ic)
# 计算RankIC
rank_ic, _ = stats.spearmanr(factor_values, next_returns)
performance.rank_ic_series.append(rank_ic)
# 计算多空组合收益
sorted_pairs = sorted(zip(factor_values, next_returns),
key=lambda x: x[0])
n = len(sorted_pairs) // 5 # 五分位
if n > 0:
top_returns = [r for _, r in sorted_pairs[-n:]]
bottom_returns = [r for _, r in sorted_pairs[:n]]
spread = np.mean(top_returns) - np.mean(bottom_returns)
performance.return_spreads.append(spread)
# 计算Sharpe比率
if performance.return_spreads:
annual_factor = np.sqrt(252) # 交易日数
returns_mean = np.mean(performance.return_spreads)
returns_std = np.std(performance.return_spreads)
if returns_std > 0:
performance.sharpe_ratio = returns_mean / returns_std * annual_factor
# 估算换手率
if factor_results:
dates = sorted(date_groups.keys())
turnovers = []
for i in range(1, len(dates)):
prev_results = date_groups[dates[i-1]]
curr_results = date_groups[dates[i]]
# 计算组合变化
prev_top = {r.ts_code for r in prev_results
if r.values.get(factor_name, float('-inf')) > np.percentile(
[res.values.get(factor_name, float('-inf'))
for res in prev_results], 80)}
curr_top = {r.ts_code for r in curr_results
if r.values.get(factor_name, float('-inf')) > np.percentile(
[res.values.get(factor_name, float('-inf'))
for res in curr_results], 80)}
# 计算换手率
if prev_top and curr_top:
turnover = len(prev_top ^ curr_top) / len(prev_top | curr_top)
turnovers.append(turnover)
if turnovers:
performance.turnover_rate = np.mean(turnovers)
return performance
def combine_factors(
factor_names: Sequence[str],
weights: Optional[Sequence[float]] = None
) -> FactorSpec:
"""组合多个因子。
Args:
factor_names: 因子名称列表
weights: 可选的权重列表默认等权重
Returns:
组合因子的规格
"""
if not weights:
weights = [1.0 / len(factor_names)] * len(factor_names)
name = "combined_" + "_".join(factor_names)
window = max(
spec.window
for spec in DEFAULT_FACTORS
if spec.name in factor_names
)
return FactorSpec(name, window)

View File

@ -8,9 +8,21 @@ end-to-end automated decision-making requirements.
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Sequence
from typing import Dict, List, Sequence, Optional
from app.core.indicators import momentum, volatility, rolling_mean, normalize
import numpy as np
from app.core.indicators import momentum, rolling_mean, normalize
from app.core.technical import (
rsi, macd, bollinger_bands, obv_momentum, price_volume_trend
)
from app.core.momentum import (
adaptive_momentum, momentum_quality, momentum_regime
)
from app.core.volatility import (
volatility, garch_volatility, volatility_regime,
volume_price_correlation
)
@dataclass
@ -27,169 +39,193 @@ class FactorSpec:
# Extended factors focusing on momentum, value, and liquidity signals
EXTENDED_FACTORS: List[FactorSpec] = [
# 技术分析因子
FactorSpec("tech_rsi_14", 14), # 14日RSI
FactorSpec("tech_macd_signal", 26), # MACD信号
FactorSpec("tech_bb_position", 20), # 布林带位置
FactorSpec("tech_obv_momentum", 20), # OBV动量
FactorSpec("tech_pv_trend", 20), # 价量趋势
# 趋势跟踪因子
FactorSpec("trend_ma_cross", 20), # 均线交叉信号
FactorSpec("trend_price_channel", 20), # 价格通道突破
FactorSpec("trend_adx", 14), # 平均趋向指标
# 市场微观结构因子
FactorSpec("micro_tick_direction", 5), # 逐笔方向
FactorSpec("micro_trade_imbalance", 10), # 交易失衡
# 波动率预测因子
FactorSpec("vol_garch", 20), # GARCH波动率
FactorSpec("vol_range_pred", 10), # 波动区间预测
FactorSpec("vol_regime", 20), # 波动率状态
# 量价联合因子
FactorSpec("volume_price_corr", 20), # 量价相关性
FactorSpec("volume_price_diverge", 10), # 量价背离
FactorSpec("volume_intensity", 5), # 成交强度
# 增强动量因子
FactorSpec("mom_10_30", 0), # 10日与30日动量差
FactorSpec("mom_5_20_rank", 0), # 相对排名动量因子
FactorSpec("mom_dynamic", 0), # 动态窗口动量因子
# 波动率相关因子
FactorSpec("volat_5", 5), # 短期波动率
FactorSpec("volat_ratio", 0), # 长短期波动率比率
# 换手率扩展因子
FactorSpec("turn_60", 60), # 长期换手率
FactorSpec("turn_rank", 0), # 换手率相对排名
FactorSpec("momentum_adaptive", 20), # 自适应动量
FactorSpec("momentum_regime", 20), # 动量区间
FactorSpec("momentum_quality", 20), # 动量质量,
# 价格均线比率因子
FactorSpec("price_ma_10_ratio", 0), # 当前价格与10日均线比率
FactorSpec("price_ma_20_ratio", 0), # 当前价格与20日均线比率
FactorSpec("price_ma_60_ratio", 0), # 当前价格与60日均线比率
# 成交量均线比率因子
FactorSpec("volume_ma_5_ratio", 0), # 当前成交量与5日均线比率
FactorSpec("volume_ma_20_ratio", 0), # 当前成交量与20日均线比率
# 高级估值因子
FactorSpec("val_ps_score", 0), # PS估值评分
FactorSpec("val_multiscore", 0), # 综合估值评分
FactorSpec("val_dividend_score", 0), # 股息率估值评分
# 市场状态因子
FactorSpec("market_regime", 0), # 市场状态因子
FactorSpec("trend_strength", 0), # 趋势强度因子
FactorSpec("price_ma_10_ratio", 10), # 当前价格与10日均线比率
FactorSpec("price_ma_20_ratio", 20), # 当前价格与20日均线比率
FactorSpec("price_ma_60_ratio", 60), # 当前价格与60日均线比率
# 成交量均线比率因子
FactorSpec("volume_ma_5_ratio", 5), # 当前成交量与5日均线比率
FactorSpec("volume_ma_20_ratio", 20), # 当前成交量与20日均线比率
]
def compute_extended_factor_values(
close_series: Sequence[float],
volume_series: Sequence[float],
turnover_series: Sequence[float],
latest_fields: Dict[str, float],
) -> Dict[str, float]:
"""Compute values for extended factors.
class ExtendedFactors:
"""扩展因子计算实现类。
Args:
close_series: Closing prices series (most recent first)
volume_series: Trading volume series (most recent first)
turnover_series: Turnover rate series (most recent first)
latest_fields: Latest available fields including valuation ratios
Returns:
Dictionary mapping factor names to computed values
该类实现了一组用于量化交易的扩展因子计算包括:
1. 技术分析因子 (RSI, MACD, 布林带等)
2. 趋势跟踪因子 (均线交叉等)
3. 波动率预测因子 (GARCH, 波动率状态等)
4. 量价联合因子 (量价相关性等)
5. 动量强化因子 (自适应动量等)
6. 均线比率因子 (价格/成交量均线比率)
使用示例:
calculator = ExtendedFactors()
factor_value = calculator.compute_factor(
"tech_rsi_14",
close_series,
volume_series
)
all_factors = calculator.compute_all_factors(close_series, volume_series)
normalized = calculator.normalize_factors(all_factors)
"""
if not close_series:
return {}
def __init__(self):
"""初始化因子计算器"""
self.factor_specs = {spec.name: spec for spec in EXTENDED_FACTORS}
results: Dict[str, float] = {}
# 增强动量因子
# 10日与30日动量差
if len(close_series) >= 30:
mom_10 = momentum(close_series, 10)
mom_30 = momentum(close_series, 30)
if mom_10 is not None and mom_30 is not None:
results["mom_10_30"] = mom_10 - mom_30
# 相对排名动量因子
# 这里需要市场数据来计算相对排名,暂时使用简化版本
if len(close_series) >= 20:
mom_20 = momentum(close_series, 20)
if mom_20 is not None:
# 简化处理:将动量标准化
results["mom_5_20_rank"] = min(1.0, max(0.0, (mom_20 + 0.2) / 0.4))
# 动态窗口动量因子
# 根据波动率动态调整窗口
if len(close_series) >= 20:
volat_20 = volatility(close_series, 20)
mom_20 = momentum(close_series, 20)
if volat_20 is not None and mom_20 is not None and volat_20 > 0:
# 波动率调整后的动量
results["mom_dynamic"] = mom_20 / volat_20
# 波动率相关因子
# 短期波动率
if len(close_series) >= 5:
results["volat_5"] = volatility(close_series, 5)
# 长短期波动率比率
if len(close_series) >= 20 and len(close_series) >= 5:
volat_5 = volatility(close_series, 5)
volat_20 = volatility(close_series, 20)
if volat_5 is not None and volat_20 is not None and volat_20 > 0:
results["volat_ratio"] = volat_5 / volat_20
# 换手率扩展因子
# 长期换手率
if len(turnover_series) >= 60:
results["turn_60"] = rolling_mean(turnover_series, 60)
# 换手率相对排名
if len(turnover_series) >= 20:
turn_20 = rolling_mean(turnover_series, 20)
if turn_20 is not None:
# 简化处理:将换手率标准化
results["turn_rank"] = min(1.0, max(0.0, turn_20 / 5.0)) # 假设5%为高换手率
# 价格均线比率因子
if len(close_series) >= 10:
ma_10 = rolling_mean(close_series, 10)
if ma_10 is not None and ma_10 > 0:
results["price_ma_10_ratio"] = close_series[0] / ma_10
if len(close_series) >= 20:
ma_20 = rolling_mean(close_series, 20)
if ma_20 is not None and ma_20 > 0:
results["price_ma_20_ratio"] = close_series[0] / ma_20
if len(close_series) >= 60:
ma_60 = rolling_mean(close_series, 60)
if ma_60 is not None and ma_60 > 0:
results["price_ma_60_ratio"] = close_series[0] / ma_60
# 成交量均线比率因子
if len(volume_series) >= 5:
vol_ma_5 = rolling_mean(volume_series, 5)
if vol_ma_5 is not None and vol_ma_5 > 0:
results["volume_ma_5_ratio"] = volume_series[0] / vol_ma_5
if len(volume_series) >= 20:
vol_ma_20 = rolling_mean(volume_series, 20)
if vol_ma_20 is not None and vol_ma_20 > 0:
results["volume_ma_20_ratio"] = volume_series[0] / vol_ma_20
# 高级估值因子
ps = latest_fields.get("daily_basic.ps")
if ps is not None and ps > 0:
# PS估值评分
results["val_ps_score"] = 2.5 / (2.5 + ps) # 使用2.5作为scale参数
pe = latest_fields.get("daily_basic.pe")
pb = latest_fields.get("daily_basic.pb")
# 综合估值评分
scores = []
if pe is not None and pe > 0:
scores.append(12.0 / (12.0 + pe)) # PE评分
if pb is not None and pb > 0:
scores.append(2.5 / (2.5 + pb)) # PB评分
if ps is not None and ps > 0:
scores.append(2.5 / (2.5 + ps)) # PS评分
if scores:
results["val_multiscore"] = sum(scores) / len(scores)
dv_ratio = latest_fields.get("daily_basic.dv_ratio")
if dv_ratio is not None:
# 股息率估值评分
results["val_dividend_score"] = min(1.0, max(0.0, dv_ratio / 5.0)) # 假设5%为高股息率
# 市场状态因子
# 简单的市场状态指标:基于价格位置
if len(close_series) >= 60:
ma_60 = rolling_mean(close_series, 60)
if ma_60 is not None and ma_60 > 0:
results["market_regime"] = close_series[0] / ma_60
# 趋势强度因子
if len(close_series) >= 20:
mom_20 = momentum(close_series, 20)
volat_20 = volatility(close_series, 20)
if mom_20 is not None and volat_20 is not None and volat_20 > 0:
# 趋势强度:动量与波动率的比率
results["trend_strength"] = abs(mom_20) / volat_20
return results
def compute_factor(self,
factor_name: str,
close_series: Sequence[float],
volume_series: Sequence[float]) -> Optional[float]:
"""计算单个因子值
Args:
factor_name: 因子名称
close_series: 收盘价序列从新到旧排序
volume_series: 成交量序列从新到旧排序
Returns:
因子值如果计算失败则返回None
"""
try:
spec = self.factor_specs.get(factor_name)
if spec is None:
print(f"Unknown factor: {factor_name}")
return None
if len(close_series) < spec.window:
return None
# 技术分析因子
if factor_name == "tech_rsi_14":
return rsi(close_series, 14)
elif factor_name == "tech_macd_signal":
_, signal = macd(close_series)
return signal
elif factor_name == "tech_bb_position":
upper, lower = bollinger_bands(close_series, 20)
pos = (close_series[0] - lower) / (upper - lower + 1e-8)
return pos
elif factor_name == "tech_obv_momentum":
return obv_momentum(close_series, volume_series, 20)
elif factor_name == "tech_pv_trend":
return price_volume_trend(close_series, volume_series, 20)
# 趋势跟踪因子
elif factor_name == "trend_ma_cross":
ma_5 = rolling_mean(close_series, 5)
ma_20 = rolling_mean(close_series, 20)
return ma_5 - ma_20
# 波动率预测因子
elif factor_name == "vol_garch":
return garch_volatility(close_series, 20)
elif factor_name == "vol_regime":
regime, _ = volatility_regime(close_series, volume_series, 20)
return regime
# 量价联合因子
elif factor_name == "volume_price_corr":
return volume_price_correlation(close_series, volume_series, 20)
# 增强动量因子
elif factor_name == "momentum_adaptive":
return adaptive_momentum(close_series, volume_series, 20)
elif factor_name == "momentum_regime":
return momentum_regime(close_series, volume_series, 20)
elif factor_name == "momentum_quality":
return momentum_quality(close_series, 20)
# 均线比率因子
elif factor_name.endswith("_ratio"):
if "price_ma" in factor_name:
window = int(factor_name.split("_")[2])
ma = rolling_mean(close_series, window)
return close_series[0] / ma if ma > 0 else None
elif "volume_ma" in factor_name:
window = int(factor_name.split("_")[2])
ma = rolling_mean(volume_series, window)
return volume_series[0] / ma if ma > 0 else None
return None
except Exception as e:
print(f"Error computing factor {factor_name}: {str(e)}")
return None
def compute_all_factors(self,
close_series: Sequence[float],
volume_series: Sequence[float]) -> Dict[str, float]:
"""计算所有扩展因子值
Args:
close_series: 收盘价序列从新到旧排序
volume_series: 成交量序列从新到旧排序
Returns:
因子名称到因子值的映射字典
"""
results = {}
for factor_name in self.factor_specs:
value = self.compute_factor(factor_name, close_series, volume_series)
if value is not None:
results[factor_name] = value
return results
def normalize_factors(self, factors: Dict[str, float]) -> Dict[str, float]:
"""标准化因子值到[-1,1]区间
Args:
factors: 原始因子值字典
Returns:
标准化后的因子值字典
"""
results = {}
for name, value in factors.items():
if value is not None:
results[name] = normalize(value)
return results

View File

@ -2,6 +2,7 @@
from __future__ import annotations
import re
import sqlite3
from dataclasses import dataclass
from datetime import datetime, date, timezone
from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple, Union
@ -12,7 +13,9 @@ from app.utils.data_access import DataBroker
from app.utils.db import db_session
from app.utils.logging import get_logger
# 导入扩展因子模块
from app.features.extended_factors import compute_extended_factor_values
from app.features.extended_factors import ExtendedFactors
# 导入因子验证功能
from app.features.validation import check_data_sufficiency, detect_outliers
LOGGER = get_logger(__name__)
@ -294,9 +297,12 @@ def _compute_batch_factors(
if values:
# 检测并处理异常值
values = _detect_and_handle_outliers(values, ts_code)
batch_results.append((ts_code, values))
validation_stats["success"] += 1
cleaned_values = detect_outliers(values, ts_code, trade_date)
if cleaned_values:
batch_results.append((ts_code, cleaned_values))
validation_stats["success"] += 1
else:
validation_stats["outliers"] += 1
else:
validation_stats["skipped"] += 1
except Exception as e:
@ -318,21 +324,42 @@ def _check_data_availability(
specs: Sequence[FactorSpec],
) -> bool:
"""检查证券数据是否足够计算所有请求的因子"""
# 获取最小需要的数据天数
min_days = 1 # 至少需要当天的数据
for spec in specs:
if spec.window > min_days:
min_days = spec.window
# 检查所需的最大窗口
close_windows = [spec.window for spec in specs if spec.name.startswith(("mom_", "volat_"))]
turnover_windows = [spec.window for spec in specs if spec.name.startswith("turn_")]
max_close_window = max(close_windows) if close_windows else 0
max_turn_window = max(turnover_windows) if turnover_windows else 0
# 检查基本数据是否存在
basic_data = broker.fetch_latest(
# 获取时间序列数据
close_series = _fetch_series_values(broker, "daily", "close", ts_code, trade_date, max_close_window)
if max_close_window > 0 and not check_data_sufficiency(
close_series, max_close_window, "close", ts_code, trade_date
):
return False
turnover_series = _fetch_series_values(
broker, "daily_basic", "turnover_rate", ts_code, trade_date, max_turn_window
)
if max_turn_window > 0 and not check_data_sufficiency(
turnover_series, max_turn_window, "turnover_rate", ts_code, trade_date
):
return False
# 检查快照数据
latest_fields = broker.fetch_latest(
ts_code,
trade_date,
["daily.close", "daily_basic.turnover_rate"],
["daily.close", "daily_basic.turnover_rate", "daily_basic.pe", "daily_basic.pb"]
)
# 检查时间序列数据是否足够
close_check = broker.fetch_series("daily", "close", ts_code, trade_date, min_days)
required_fields = {"daily.close", "daily_basic.turnover_rate"}
for field in required_fields:
if latest_fields.get(field) is None:
LOGGER.warning(
"缺少必需字段 field=%s ts_code=%s date=%s",
field, ts_code, trade_date,
extra=LOG_EXTRA
)
return False
return (
bool(basic_data.get("daily.close")) and
@ -485,9 +512,8 @@ def _compute_security_factors(
)
# 计算扩展因子值
extended_factors = compute_extended_factor_values(
close_series, volume_series, turnover_series, latest_fields
)
calculator = ExtendedFactors()
extended_factors = calculator.compute_all_factors(close_series, volume_series)
results.update(extended_factors)
# 确保返回结果不为空

169
app/features/validation.py Normal file
View File

@ -0,0 +1,169 @@
"""Factor validation and quality control utilities."""
from __future__ import annotations
from typing import Dict, Optional, Sequence
import numpy as np
from app.utils.logging import get_logger
LOGGER = get_logger(__name__)
LOG_EXTRA = {"stage": "factor_validation"}
# 因子值范围限制配置
FACTOR_LIMITS = {
# 动量类因子限制在 ±50%
"mom_": (-0.5, 0.5),
# 波动率类因子限制在 0-30%
"volat_": (0, 0.3),
# 换手率类因子限制在 0-100%
"turn_": (0, 1.0),
# 估值评分类因子限制在 0-1
"val_": (0, 1.0),
# 量价类因子
"volume_": (0, 5.0),
# 市场状态类因子
"market_": (-1.0, 1.0),
}
def validate_factor_value(
name: str,
value: float,
ts_code: str,
trade_date: str
) -> Optional[float]:
"""验证单个因子值是否在合理范围内。
Args:
name: 因子名称
value: 因子值
ts_code: 股票代码
trade_date: 交易日期
Returns:
如果因子值有效则返回原值否则返回 None
"""
if value is None:
return None
# 检查是否为有限数值
if not np.isfinite(value):
LOGGER.warning(
"因子值非有限数值 factor=%s value=%f ts_code=%s date=%s",
name, value, ts_code, trade_date,
extra=LOG_EXTRA
)
return None
# 根据因子类型应用不同的限制
for prefix, (min_val, max_val) in FACTOR_LIMITS.items():
if name.startswith(prefix):
if value < min_val or value > max_val:
LOGGER.warning(
"因子值超出范围 factor=%s value=%f range=[%f,%f] ts_code=%s date=%s",
name, value, min_val, max_val, ts_code, trade_date,
extra=LOG_EXTRA
)
return None
break
return value
def detect_outliers(
values: Dict[str, float],
ts_code: str,
trade_date: str
) -> Dict[str, float]:
"""检测和处理因子值中的异常值。
Args:
values: 因子值字典
ts_code: 股票代码
trade_date: 交易日期
Returns:
处理后的因子值字典
"""
result = {}
for name, value in values.items():
validated = validate_factor_value(name, value, ts_code, trade_date)
if validated is not None:
result[name] = validated
return result
def check_data_sufficiency(
ts_code: str,
trade_date: str,
min_days: int = 60
) -> bool:
"""验证因子计算所需数据是否充分。
Args:
ts_code: 股票代码
trade_date: 交易日期
min_days: 最少需要的历史数据天数
Returns:
数据是否充分
"""
from app.utils.data_access import DataBroker
# 检查历史收盘价数据
close_series = DataBroker.get_daily_price(ts_code, end_date=trade_date)
if len(close_series) < min_days:
LOGGER.warning(
"历史数据不足 ts_code=%s date=%s min_days=%d actual=%d",
ts_code, trade_date, min_days, len(close_series),
extra=LOG_EXTRA
)
return False
# 检查日期点数据完整性
latest_fields = DataBroker.get_latest_fields(
ts_code,
trade_date,
["daily.close", "daily_basic.turnover_rate", "daily_basic.pe", "daily_basic.pb"]
)
required_fields = {"daily.close", "daily_basic.turnover_rate"}
for field in required_fields:
if latest_fields.get(field) is None:
LOGGER.warning(
"缺少必需字段 field=%s ts_code=%s date=%s",
field, ts_code, trade_date,
extra=LOG_EXTRA
)
return False
return True
"""检查数据序列是否满足计算要求。
Args:
data: 数据序列
required_length: 所需最小长度
field_name: 字段名称
ts_code: 股票代码
trade_date: 交易日期
Returns:
数据是否足够
"""
if len(data) < required_length:
LOGGER.warning(
"数据长度不足 field=%s required=%d actual=%d ts_code=%s date=%s",
field_name, required_length, len(data), ts_code, trade_date,
extra=LOG_EXTRA
)
return False
# 检查数据有效性
valid_count = sum(1 for x in data if x is not None and np.isfinite(x))
if valid_count < required_length:
LOGGER.warning(
"有效数据不足 field=%s required=%d valid=%d ts_code=%s date=%s",
field_name, required_length, valid_count, ts_code, trade_date,
extra=LOG_EXTRA
)
return False
return True

View File

@ -6,3 +6,5 @@ requests>=2.31
python-box>=7.0
pytest>=7.0
feedparser>=6.0
arch>=6.1.0
scipy>=1.11.0

View File

@ -0,0 +1,128 @@
"""Test extended factor calculations."""
import numpy as np
import pytest
from app.features.extended_factors import ExtendedFactors, FactorSpec
def test_technical_factors():
"""测试技术分析因子计算。"""
# 准备测试数据
close = np.array([100, 102, 98, 103, 97, 105, 102, 98, 103, 99,
100, 102, 98, 103, 97, 105, 102, 98, 103, 99])
volume = np.array([1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700,
1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700])
factors = ExtendedFactors()
# 测试RSI计算
rsi_spec = FactorSpec("tech_rsi_14", 14)
rsi = factors.compute_factor(rsi_spec, close, volume)
assert rsi is not None
assert 0 <= rsi <= 100
# 测试MACD计算
macd_spec = FactorSpec("tech_macd_signal", 26)
macd = factors.compute_factor(macd_spec, close, volume)
assert macd is not None
# 测试布林带位置
bb_spec = FactorSpec("tech_bb_position", 20)
bb = factors.compute_factor(bb_spec, close, volume)
assert bb is not None
assert 0 <= bb <= 1
# 测试OBV动量
obv_spec = FactorSpec("tech_obv_momentum", 20)
obv = factors.compute_factor(obv_spec, close, volume)
assert obv is not None
def test_momentum_factors():
"""测试动量因子计算。"""
# 准备测试数据
close = np.array([100, 102, 98, 103, 97, 105, 102, 98, 103, 99,
100, 102, 98, 103, 97, 105, 102, 98, 103, 99])
volume = np.array([1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700,
1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700])
factors = ExtendedFactors()
# 测试自适应动量
adaptive_spec = FactorSpec("momentum_adaptive", 20)
adaptive = factors.compute_factor(adaptive_spec, close, volume)
assert adaptive is not None
# 测试动量质量
quality_spec = FactorSpec("momentum_quality", 20)
quality = factors.compute_factor(quality_spec, close, volume)
assert quality is not None
assert -1 <= quality <= 1
# 测试动量状态
regime_spec = FactorSpec("momentum_regime", 20)
regime = factors.compute_factor(regime_spec, close, volume)
assert regime is not None
assert -1 <= regime <= 1
def test_volatility_factors():
"""测试波动率因子计算。"""
# 准备测试数据
close = np.array([100, 102, 98, 103, 97, 105, 102, 98, 103, 99,
100, 102, 98, 103, 97, 105, 102, 98, 103, 99])
volume = np.array([1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700,
1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700])
factors = ExtendedFactors()
# 测试GARCH波动率
garch_spec = FactorSpec("vol_garch", 20)
garch = factors.compute_factor(garch_spec, close, volume)
assert garch is not None
assert garch >= 0
# 测试波动率状态
regime_spec = FactorSpec("vol_regime", 20)
regime = factors.compute_factor(regime_spec, close, volume)
assert regime is not None
def test_compute_all_factors():
"""测试计算所有因子。"""
# 准备测试数据
close = np.array([100, 102, 98, 103, 97, 105, 102, 98, 103, 99,
100, 102, 98, 103, 97, 105, 102, 98, 103, 99])
volume = np.array([1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700,
1000, 1200, 800, 1500, 600, 2000, 1100, 900, 1300, 700])
factors = ExtendedFactors()
# 计算所有因子
results = factors.compute_all_factors(close, volume)
# 验证结果
assert len(results) > 0
for value in results.values():
assert isinstance(value, (int, float))
assert not np.isnan(value)
def test_data_validation():
"""测试数据验证。"""
# 构造无效数据
close = np.array([100, np.nan, 98, 103]) # 包含NaN
volume = np.array([1000, 1200, -800, 1500]) # 包含负值
factors = ExtendedFactors()
# 测试单个因子计算
rsi_spec = FactorSpec("tech_rsi_14", 14)
rsi = factors.compute_factor(rsi_spec, close, volume)
assert rsi is None # 数据无效应返回None
# 测试整体计算
results = factors.compute_all_factors(close, volume)
assert len(results) == 0 # 无效数据应返回空字典