diff --git a/app/features/extended_factors.py b/app/features/extended_factors.py index 71ecf54..57a3abf 100644 --- a/app/features/extended_factors.py +++ b/app/features/extended_factors.py @@ -61,6 +61,7 @@ from app.core.volatility import ( volatility, garch_volatility, volatility_regime, volume_price_correlation ) +from app.features.validation import validate_factor_value @dataclass @@ -159,11 +160,12 @@ class ExtendedFactors: def __init__(self): """初始化因子计算器,构建因子规格映射""" self.factor_specs = {spec.name: spec for spec in EXTENDED_FACTORS} - LOGGER.info( - "初始化因子计算器,加载因子数量: %d", - len(self.factor_specs), - extra=LOG_EXTRA - ) + # 关闭初始化日志打印 + # LOGGER.info( + # "初始化因子计算器,加载因子数量: %d", + # len(self.factor_specs), + # extra=LOG_EXTRA + # ) @handle_factor_errors def compute_factor(self, @@ -490,16 +492,28 @@ class ExtendedFactors: for factor_name in self.factor_specs: value = self.compute_factor(factor_name, close_series, volume_series) if value is not None: - results[factor_name] = value - success_count += 1 + # 验证因子值是否在合理范围内 + validated_value = validate_factor_value( + factor_name, value, "unknown", "unknown" + ) + if validated_value is not None: + results[factor_name] = validated_value + success_count += 1 + else: + LOGGER.debug( + "因子值验证失败 factor=%s value=%f", + factor_name, value, + extra=LOG_EXTRA + ) - LOGGER.info( - "因子计算完成 total=%d success=%d failed=%d", - total_count, - success_count, - total_count - success_count, - extra=LOG_EXTRA - ) + # 关闭因子计算完成日志打印 + # LOGGER.info( + # "因子计算完成 total=%d success=%d failed=%d", + # total_count, + # success_count, + # total_count - success_count, + # extra=LOG_EXTRA + # ) return results diff --git a/app/features/factors.py b/app/features/factors.py index 55b765a..99659bd 100644 --- a/app/features/factors.py +++ b/app/features/factors.py @@ -18,6 +18,8 @@ from app.features.sentiment_factors import SentimentFactors from app.features.value_risk_factors import ValueRiskFactors # 导入因子验证功能 from app.features.validation import check_data_sufficiency, detect_outliers +# 导入UI进度状态管理 +from app.ui.progress_state import factor_progress LOGGER = get_logger(__name__) @@ -162,42 +164,71 @@ def compute_factors( results: List[FactorResult] = [] rows_to_persist: List[tuple[str, Dict[str, float | None]]] = [] - # 分批处理以优化性能 - for i in range(0, len(universe), batch_size): - batch = universe[i:i+batch_size] - batch_results = _compute_batch_factors(broker, batch, trade_date_str, specs, validation_stats) + try: + # 启动UI进度状态 + factor_progress.start_calculation( + total_securities=len(universe), + total_batches=(len(universe) + batch_size - 1) // batch_size + ) - for ts_code, values in batch_results: - if values: - results.append(FactorResult(ts_code=ts_code, trade_date=trade_date, values=values)) - rows_to_persist.append((ts_code, values)) - - # 显示进度 - processed = min(i + batch_size, len(universe)) - if processed % (batch_size * 5) == 0 or processed == len(universe): - LOGGER.info( - "因子计算进度: %s/%s (%.1f%%) 成功:%s 跳过:%s 数据缺失:%s 异常值:%s", - processed, len(universe), - (processed / len(universe)) * 100, - validation_stats["success"], - validation_stats["skipped"], - validation_stats["data_missing"], - validation_stats["outliers"], - extra=LOG_EXTRA, + # 分批处理以优化性能 + for i in range(0, len(universe), batch_size): + batch = universe[i:i+batch_size] + batch_results = _compute_batch_factors( + broker, + batch, + trade_date_str, + specs, + validation_stats, + batch_index=i // batch_size, + total_batches=(len(universe) + batch_size - 1) // batch_size, + processed_securities=i, + total_securities=len(universe) ) + + for ts_code, values in batch_results: + if values: + results.append(FactorResult(ts_code=ts_code, trade_date=trade_date, values=values)) + rows_to_persist.append((ts_code, values)) + + # 显示进度 + processed = min(i + batch_size, len(universe)) + if processed % (batch_size * 5) == 0 or processed == len(universe): + LOGGER.info( + "因子计算进度: %s/%s (%.1f%%) 成功:%s 跳过:%s 数据缺失:%s 异常值:%s", + processed, len(universe), + (processed / len(universe)) * 100, + validation_stats["success"], + validation_stats["skipped"], + validation_stats["data_missing"], + validation_stats["outliers"], + extra=LOG_EXTRA, + ) - if rows_to_persist: - _persist_factor_rows(trade_date_str, rows_to_persist, specs) + if rows_to_persist: + _persist_factor_rows(trade_date_str, rows_to_persist, specs) - LOGGER.info( - "因子计算完成 总数量:%s 成功:%s 失败:%s", - len(universe), - validation_stats["success"], - validation_stats["total"] - validation_stats["success"], - extra=LOG_EXTRA, - ) - - return results + # 更新UI进度状态为完成 + factor_progress.complete_calculation( + message=f"因子计算完成: 总数量={len(universe)}, 成功={validation_stats['success']}, 失败={len(universe) - validation_stats['success']}" + ) + + LOGGER.info( + "因子计算完成 总数量:%s 成功:%s 失败:%s", + len(universe), + validation_stats["success"], + validation_stats["total"] - validation_stats["success"], + extra=LOG_EXTRA, + ) + + return results + + except Exception as exc: + # 发生错误时更新UI状态 + error_message = f"因子计算过程中发生错误: {exc}" + factor_progress.error_occurred(error_message) + LOGGER.error(error_message, extra=LOG_EXTRA) + raise def compute_factor_range( @@ -290,6 +321,10 @@ def _compute_batch_factors( trade_date: str, specs: Sequence[FactorSpec], validation_stats: Dict[str, int], + batch_index: int = 0, + total_batches: int = 1, + processed_securities: int = 0, + total_securities: int = 0, ) -> List[tuple[str, Dict[str, float | None]]]: """批量计算多个证券的因子值,提高计算效率""" batch_results = [] @@ -297,7 +332,17 @@ def _compute_batch_factors( # 批次化数据可用性检查 available_codes = _check_batch_data_availability(broker, ts_codes, trade_date, specs) - for ts_code in ts_codes: + # 更新UI进度状态 + if total_securities > 0: + current_progress = processed_securities + len(available_codes) + progress_percentage = (current_progress / total_securities) * 100 + factor_progress.update_progress( + current_securities=current_progress, + current_batch=batch_index + 1, + message=f"处理批次 {batch_index + 1}/{total_batches} - 证券 {current_progress}/{total_securities} ({progress_percentage:.1f}%)" + ) + + for i, ts_code in enumerate(ts_codes): try: # 检查数据可用性(使用批次化结果) if ts_code not in available_codes: @@ -313,10 +358,36 @@ def _compute_batch_factors( if cleaned_values: batch_results.append((ts_code, cleaned_values)) validation_stats["success"] += 1 + + # 记录验证统计信息 + original_count = len(values) + cleaned_count = len(cleaned_values) + if cleaned_count < original_count: + validation_stats["outliers"] += (original_count - cleaned_count) + LOGGER.debug( + "因子值验证结果 ts_code=%s date=%s original=%d cleaned=%d", + ts_code, trade_date, original_count, cleaned_count, + extra=LOG_EXTRA + ) else: - validation_stats["outliers"] += 1 + validation_stats["outliers"] += len(values) + LOGGER.warning( + "所有因子值均被标记为异常值 ts_code=%s date=%s", + ts_code, trade_date, + extra=LOG_EXTRA + ) else: validation_stats["skipped"] += 1 + + # 每处理10个证券更新一次进度 + if (i + 1) % 10 == 0 and total_securities > 0: + current_progress = processed_securities + i + 1 + progress_percentage = (current_progress / total_securities) * 100 + factor_progress.update_progress( + current_securities=current_progress, + current_batch=batch_index + 1, + message=f"处理批次 {batch_index + 1}/{total_batches} - 证券 {current_progress}/{total_securities} ({progress_percentage:.1f}%)" + ) except Exception as e: LOGGER.error( "计算因子失败 ts_code=%s err=%s", diff --git a/app/features/validation.py b/app/features/validation.py index 6ffdcc2..abe14ca 100644 --- a/app/features/validation.py +++ b/app/features/validation.py @@ -8,28 +8,55 @@ from app.utils.logging import get_logger LOGGER = get_logger(__name__) LOG_EXTRA = {"stage": "factor_validation"} -# 因子值范围限制配置 +# 因子值范围限制配置 - 基于物理规律和实际数据特征 FACTOR_LIMITS = { - # 动量类因子限制在 ±100% - "mom_": (-1.0, 1.0), - # 波动率类因子限制在 0-50% - "volat_": (0, 0.5), - # 换手率类因子限制在 0-1000% (实际换手率可能很高) - "turn_": (0, 10.0), - # 估值评分类因子限制在 -1到1 - "val_": (-1.0, 1.0), - # 量价类因子 + # 动量类因子:收益率相关,限制在 ±50% (实际收益率很少超过这个范围) + "mom_": (-0.5, 0.5), + "momentum_": (-0.5, 0.5), + + # 波动率类因子:年化波动率,限制在 0-200% (考虑极端市场情况) + "volat_": (0, 2.0), + "vol_": (0, 2.0), + + # 换手率类因子:日换手率,限制在 0-100% (实际换手率通常在这个范围内) + "turn_": (0, 1.0), + + # 估值评分类因子:标准化评分,限制在 -3到3 (Z-score标准化范围) + "val_": (-3.0, 3.0), + + # 量价类因子:成交量比率,限制在 0-10倍 "volume_": (0, 10.0), - # 市场状态类因子 - "market_": (-1.0, 1.0), - # 技术指标类因子(放宽范围,允许原始技术指标值) - "tech_": (-100.0, 100.0), - # 趋势类因子(放宽范围,允许原始趋势指标值) - "trend_": (-10.0, 10.0), - # 微观结构类因子 + "volume_ratio": (0, 10.0), + + # 市场状态类因子:标准化状态指标,限制在 -3到3 + "market_": (-3.0, 3.0), + + # 技术指标类因子:具体技术指标的范围限制 + "tech_rsi": (0, 100.0), # RSI指标范围 0-100 + "tech_macd": (-0.5, 0.5), # MACD信号范围 + "tech_bb": (-3.0, 3.0), # 布林带位置,标准差倍数 + "tech_obv": (-10.0, 10.0), # OBV动量标准化 + "tech_pv": (-1.0, 1.0), # 量价趋势相关性 + + # 趋势类因子:趋势强度指标 + "trend_": (-3.0, 3.0), + "trend_ma": (-0.5, 0.5), # 均线交叉 + "trend_adx": (0, 100.0), # ADX趋势强度 0-100 + + # 微观结构类因子:标准化微观指标 "micro_": (-1.0, 1.0), - # 情绪类因子 + + # 情绪类因子:情绪指标标准化 "sent_": (-1.0, 1.0), + + # 风险类因子:风险惩罚因子 + "risk_": (0, 1.0), + + # 价格比率类因子:价格与均线比率,限制在 0.5-2.0 (50%-200%) + "price_ma_": (0.5, 2.0), + + # 成交量比率类因子:成交量与均线比率,限制在 0.1-10.0 + "volume_ma_": (0.1, 10.0), } def validate_factor_value( @@ -61,19 +88,76 @@ def validate_factor_value( ) return None - # 根据因子类型应用不同的限制 + # 优先检查精确匹配的因子名称 + exact_matches = { + # 技术指标精确范围 + "tech_rsi_14": (0, 100.0), # RSI指标范围 0-100 + "tech_macd_signal": (-0.5, 0.5), # MACD信号范围 + "tech_bb_position": (-3.0, 3.0), # 布林带位置,标准差倍数 + "tech_obv_momentum": (-10.0, 10.0), # OBV动量标准化 + "tech_pv_trend": (-1.0, 1.0), # 量价趋势相关性 + + # 趋势指标精确范围 + "trend_adx": (0, 100.0), # ADX趋势强度 0-100 + "trend_ma_cross": (-1.0, 1.0), # 均线交叉 + "trend_price_channel": (0, 1.0), # 价格通道位置 + + # 波动率指标精确范围 + "vol_garch": (0, 0.5), # GARCH波动率预测,限制在50%以内 + "vol_range_pred": (0, 0.2), # 波动率范围预测,限制在20%以内 + "vol_regime": (0, 1.0), # 波动率状态,0-1之间 + + # 微观结构精确范围 + "micro_tick_direction": (0, 1.0), # 买卖方向比例 + "micro_trade_imbalance": (-1.0, 1.0), # 交易不平衡度 + + # 情绪指标精确范围 + "sent_impact": (0, 1.0), # 情绪影响度 + "sent_divergence": (-1.0, 1.0), # 情绪分歧度 + } + + # 检查精确匹配 + if name in exact_matches: + min_val, max_val = exact_matches[name] + if min_val <= value <= max_val: + return value + else: + LOGGER.warning( + "因子值超出精确范围 factor=%s value=%f range=[%f,%f] ts_code=%s date=%s", + name, value, min_val, max_val, ts_code, trade_date, + extra=LOG_EXTRA + ) + return None + + # 检查前缀模式匹配 for prefix, (min_val, max_val) in FACTOR_LIMITS.items(): if name.startswith(prefix): - if value < min_val or value > max_val: + if min_val <= value <= max_val: + return value + else: LOGGER.warning( - "因子值超出范围 factor=%s value=%f range=[%f,%f] ts_code=%s date=%s", + "因子值超出前缀范围 factor=%s value=%f range=[%f,%f] ts_code=%s date=%s", name, value, min_val, max_val, ts_code, trade_date, extra=LOG_EXTRA ) return None - break - - return value + + # 如果没有匹配,使用更严格的默认范围 + default_min, default_max = -5.0, 5.0 + if default_min <= value <= default_max: + LOGGER.debug( + "因子使用默认范围验证通过 factor=%s value=%f ts_code=%s date=%s", + name, value, ts_code, trade_date, + extra=LOG_EXTRA + ) + return value + else: + LOGGER.warning( + "因子值超出默认范围 factor=%s value=%f range=[%f,%f] ts_code=%s date=%s", + name, value, default_min, default_max, ts_code, trade_date, + extra=LOG_EXTRA + ) + return None def detect_outliers( values: Dict[str, float], diff --git a/app/ui/progress_state.py b/app/ui/progress_state.py new file mode 100644 index 0000000..fafcedd --- /dev/null +++ b/app/ui/progress_state.py @@ -0,0 +1,194 @@ +"""UI进度状态管理模块,用于因子计算进度同步。""" +from __future__ import annotations + +from typing import Optional, Dict, Any +import streamlit as st + + +class FactorProgressState: + """因子计算进度状态管理类""" + + def __init__(self): + """初始化进度状态""" + if 'factor_progress' not in st.session_state: + st.session_state.factor_progress = { + 'current': 0, + 'total': 0, + 'percentage': 0.0, + 'current_batch': 0, + 'total_batches': 0, + 'status': 'idle', # idle, running, completed, error + 'message': '', + 'start_time': None, + 'elapsed_time': 0 + } + + def start_calculation(self, total_securities: int, total_batches: int) -> None: + """开始因子计算 + + Args: + total_securities: 总证券数量 + total_batches: 总批次数 + """ + st.session_state.factor_progress.update({ + 'current': 0, + 'total': total_securities, + 'percentage': 0.0, + 'current_batch': 0, + 'total_batches': total_batches, + 'status': 'running', + 'message': '开始因子计算...', + 'start_time': st.session_state.get('factor_progress', {}).get('start_time'), + 'elapsed_time': 0 + }) + + def update_progress(self, current_securities: int, current_batch: int, + message: str = '') -> None: + """更新计算进度 + + Args: + current_securities: 当前已处理证券数量 + current_batch: 当前批次 + message: 进度消息 + """ + progress = st.session_state.factor_progress + + # 计算百分比 + if progress['total'] > 0: + percentage = (current_securities / progress['total']) * 100 + else: + percentage = 0.0 + + # 更新状态 + progress.update({ + 'current': current_securities, + 'current_batch': current_batch, + 'percentage': percentage, + 'message': message or f'处理批次 {current_batch}/{progress["total_batches"]}', + 'status': 'running' + }) + + def complete_calculation(self, message: str = '因子计算完成') -> None: + """完成因子计算 + + Args: + message: 完成消息 + """ + progress = st.session_state.factor_progress + progress.update({ + 'current': progress['total'], + 'percentage': 100.0, + 'status': 'completed', + 'message': message + }) + + def error_occurred(self, error_message: str) -> None: + """发生错误 + + Args: + error_message: 错误消息 + """ + st.session_state.factor_progress.update({ + 'status': 'error', + 'message': f'错误: {error_message}' + }) + + def get_progress_info(self) -> Dict[str, Any]: + """获取当前进度信息 + + Returns: + 进度信息字典 + """ + return st.session_state.factor_progress.copy() + + def reset(self) -> None: + """重置进度状态""" + st.session_state.factor_progress = { + 'current': 0, + 'total': 0, + 'percentage': 0.0, + 'current_batch': 0, + 'total_batches': 0, + 'status': 'idle', + 'message': '', + 'start_time': None, + 'elapsed_time': 0 + } + + +# 全局进度状态实例 +factor_progress = FactorProgressState() + + +def render_factor_progress() -> None: + """渲染因子计算进度组件""" + progress_info = factor_progress.get_progress_info() + + if progress_info['status'] == 'idle': + return + + # 创建进度显示区域 + with st.container(): + st.subheader("📊 因子计算进度") + + # 进度条 + if progress_info['status'] == 'running': + st.progress(progress_info['percentage'] / 100.0) + + # 进度信息 + col1, col2, col3 = st.columns(3) + + with col1: + st.metric( + "处理进度", + f"{progress_info['current']}/{progress_info['total']}", + f"{progress_info['percentage']:.1f}%" + ) + + with col2: + st.metric( + "批次进度", + f"{progress_info['current_batch']}/{progress_info['total_batches']}", + "批次" + ) + + with col3: + status_icon = { + 'running': '🔄', + 'completed': '✅', + 'error': '❌', + 'idle': '⏸️' + }.get(progress_info['status'], '⏸️') + st.metric( + "状态", + progress_info['status'].capitalize(), + status_icon + ) + + # 消息显示 + if progress_info['message']: + st.info(progress_info['message']) + + # 错误状态特殊处理 + if progress_info['status'] == 'error': + st.error("因子计算过程中发生错误,请检查日志") + elif progress_info['status'] == 'completed': + st.success("因子计算已完成") + + +def get_factor_progress_percentage() -> float: + """获取因子计算进度百分比 + + Returns: + 进度百分比 (0-100) + """ + return factor_progress.get_progress_info()['percentage'] + + +def is_factor_calculation_running() -> bool: + """检查因子计算是否正在进行 + + Returns: + 是否正在进行因子计算 + """ + return factor_progress.get_progress_info()['status'] == 'running' \ No newline at end of file diff --git a/app/ui/streamlit_app.py b/app/ui/streamlit_app.py index 78a046b..7c84834 100644 --- a/app/ui/streamlit_app.py +++ b/app/ui/streamlit_app.py @@ -14,6 +14,7 @@ from app.data.schema import initialize_database from app.ingest.checker import run_boot_check from app.ingest.rss import ingest_configured_rss from app.ui.portfolio_config import render_portfolio_config +from app.ui.progress_state import render_factor_progress from app.ui.shared import LOGGER, LOG_EXTRA from app.ui.views import ( render_backtest_review, @@ -65,6 +66,9 @@ def main() -> None: st.error(f"❌ 自动数据更新失败:{exc}") render_global_dashboard() + + # 显示因子计算进度 + render_factor_progress() tabs = st.tabs(["今日计划", "投资池/仓位", "回测与复盘", "行情可视化", "日志钻取", "数据与设置", "自检测试"]) LOGGER.debug( diff --git a/test_factor_ranges.py b/test_factor_ranges.py new file mode 100644 index 0000000..1db0b79 --- /dev/null +++ b/test_factor_ranges.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +"""测试因子值范围验证功能""" + +from app.features.factors import compute_factors +from datetime import date + +def test_factor_ranges(): + """测试因子值范围验证功能""" + print('测试改进后的因子值范围验证功能...') + + try: + results = compute_factors( + date(2024, 1, 15), + ts_codes=['000001.SZ', '000002.SZ'], + skip_existing=False, + batch_size=10 + ) + + print(f'因子计算完成,共计算 {len(results)} 个结果') + + # 检查每个因子的值范围 + valid_count = 0 + invalid_count = 0 + + for result in results: + print(f'\n证券 {result.ts_code} 的因子值:') + for factor_name, value in result.values.items(): + if value is not None: + # 检查值是否在合理范围内 + if -10 <= value <= 10: # 放宽检查范围,主要看验证逻辑 + print(f' ✓ {factor_name}: {value:.6f}') + valid_count += 1 + else: + print(f' ✗ {factor_name}: {value:.6f} (超出范围!)') + invalid_count += 1 + + print(f'\n验证统计:') + print(f' 有效因子值: {valid_count}') + print(f' 无效因子值: {invalid_count}') + print(f' 总因子值: {valid_count + invalid_count}') + + if invalid_count == 0: + print('\n✅ 所有因子值都在合理范围内,验证通过!') + else: + print(f'\n⚠️ 发现 {invalid_count} 个超出范围的因子值,需要进一步优化') + + print('\n✅ 因子值范围验证测试完成') + + except Exception as e: + print(f'❌ 测试失败: {e}') + import traceback + traceback.print_exc() + +if __name__ == "__main__": + test_factor_ranges() \ No newline at end of file