diff --git a/app/features/factors.py b/app/features/factors.py index e6ea4f0..218fcb1 100644 --- a/app/features/factors.py +++ b/app/features/factors.py @@ -4,7 +4,7 @@ from __future__ import annotations import re import sqlite3 from dataclasses import dataclass -from datetime import datetime, date, timezone +from datetime import datetime, date, timezone, timedelta from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple, Union from app.core.indicators import momentum, rolling_mean, volatility @@ -297,6 +297,69 @@ def compute_factor_range( return aggregated +def compute_factors_incremental( + *, + factors: Iterable[FactorSpec] = DEFAULT_FACTORS, + ts_codes: Optional[Sequence[str]] = None, + skip_existing: bool = True, + max_trading_days: Optional[int] = 5, +) -> Dict[str, object]: + """增量计算因子(从最新一条因子记录之后开始)。 + + Args: + factors: 需要计算的因子列表。 + ts_codes: 限定计算的证券池。 + skip_existing: 是否跳过已存在数据。 + max_trading_days: 限制本次计算的交易日数量(按交易日计数)。 + + Returns: + 包含起止日期、参与交易日及计算结果的字典。 + """ + + initialize_database() + codes_tuple = None + if ts_codes: + normalized = [ + code.strip().upper() + for code in ts_codes + if isinstance(code, str) and code.strip() + ] + codes_tuple = tuple(dict.fromkeys(normalized)) or None + + last_date_str = _latest_factor_trade_date() + trade_dates = _list_trade_dates_after(last_date_str, codes_tuple, max_trading_days) + if not trade_dates: + LOGGER.info("未发现新的交易日需要计算因子(latest=%s)", last_date_str, extra=LOG_EXTRA) + return { + "start": None, + "end": None, + "trade_dates": [], + "results": [], + "count": 0, + } + + aggregated_results: List[FactorResult] = [] + for trade_date_str in trade_dates: + trade_day = datetime.strptime(trade_date_str, "%Y%m%d").date() + aggregated_results.extend( + compute_factors( + trade_day, + factors, + ts_codes=codes_tuple, + skip_existing=skip_existing, + ) + ) + + trading_dates = [datetime.strptime(item, "%Y%m%d").date() for item in trade_dates] + return { + "start": trading_dates[0], + "end": trading_dates[-1], + "trade_dates": trading_dates, + "results": aggregated_results, + "count": len(aggregated_results), + } + + def _load_universe(trade_date: str, allowed: Optional[set[str]] = None) -> List[str]: query = "SELECT ts_code FROM daily WHERE trade_date = ? ORDER BY ts_code" with db_session(read_only=True) as conn: @@ -385,6 +448,43 @@ def _list_trade_dates( return [row["trade_date"] for row in rows if row["trade_date"]] +def _list_trade_dates_after( + last_trade_date: Optional[str], + allowed: Optional[Sequence[str]], + limit: Optional[int], +) -> List[str]: + params: List[object] = [] + where_clauses: List[str] = [] + if last_trade_date: + where_clauses.append("trade_date > ?") + params.append(last_trade_date) + base_query = "SELECT DISTINCT trade_date FROM daily" + if allowed: + placeholders = ", ".join("?" for _ in allowed) + where_clauses.append(f"ts_code IN ({placeholders})") + params.extend(allowed) + if where_clauses: + base_query += " WHERE " + " AND ".join(where_clauses) + base_query += " ORDER BY trade_date" + if limit is not None and limit > 0: + base_query += f" LIMIT {int(limit)}" + with db_session(read_only=True) as conn: + rows = conn.execute(base_query, params).fetchall() + return [row["trade_date"] for row in rows if row["trade_date"]] + + +def _latest_factor_trade_date() -> Optional[str]: + with db_session(read_only=True) as conn: + try: + row = conn.execute("SELECT MAX(trade_date) AS max_trade_date FROM factors").fetchone() + except sqlite3.OperationalError: + return None + value = row["max_trade_date"] if row else None + if not value: + return None + return str(value) + + def _compute_batch_factors( broker: DataBroker, ts_codes: List[str], diff --git a/docs/TODO.md b/docs/TODO.md index 5dfad57..0e1d3d8 100644 --- a/docs/TODO.md +++ b/docs/TODO.md @@ -77,6 +77,6 @@ | 工作项 | 状态 | 说明 | | --- | --- | --- | -| 全量代码审查 | ⏳ | 自 `app/ui/streamlit_app.py` 起逐模块审查,剔除重复安全检查与临时代码。 | -| TODO 标记治理 | ⏳ | 为未实现功能补充 TODO 与实现思路,并清理过期逻辑。 | +| 全量代码审查 | 🔄 | 已制定 `docs/architecture/code_review_checklist.md`,按 checklist 推进模块审查。 | +| TODO 标记治理 | 🔄 | 新增 `scripts/todo_report.py` 支撑定期扫描,待梳理遗留项目。 | | 业务逻辑体检 | ⏳ | 梳理业务链路,识别需要重构或优化的模块。 | diff --git a/docs/architecture/code_review_checklist.md b/docs/architecture/code_review_checklist.md new file mode 100644 index 0000000..e2e6bc0 --- /dev/null +++ b/docs/architecture/code_review_checklist.md @@ -0,0 +1,49 @@ +# 架构与代码整洁治理计划 + +> 目标:支撑「全量代码审查|TODO 标记治理|业务逻辑体检」三项工作,持续缩减遗留债务。 + +## 1. 全量代码审查 + +- [ ] `app/ui/streamlit_app.py`:梳理启动流程、路径注入、自动更新逻辑。 +- [ ] `app/ui/views/*`:关注重复组件、可拆分的横向导航、潜在的阻塞调用。 +- [ ] `app/backtest/*`:确保引擎接口与决策环境的异常兜底一致。 +- [ ] `app/llm/*`:重点检查 Provider 降级逻辑、缓存策略、成本控制开关。 +- [ ] `app/agents/*`:核对部门权重、协议 Host、Telemetry 写入是否对齐。 +- [ ] 每轮审查输出摘要(PR 注释或 issue),并将建议归档到 `docs/architecture/`。 + +## 2. TODO 标记治理 + +1. 使用 `python scripts/todo_report.py --format table` 生成待处理列表。 +2. 按模块划分归类: + - 无效 TODO → 删除。 + - 长期待办 → 转为 `docs/TODO.md` 列表。 + - 未描述清楚 → 补充上下文、负责人、预期结果。 +3. 每周至少一次同步报告(截屏或粘贴 CLI 输出)到工作记录。 + +## 3. 业务逻辑体检 + +- 建立「关键链路」档案(数据采集 → 回测 → 调参 → UI 呈现)。 +- 对每条链路列出: + - 输入/输出契约 + - 依赖的配置项 + - 失败重试 & 降级逻辑 + - 观察指标(日志、Telemetry、Metrics) +- 体检发现的问题纳入 `docs/TODO.md` 或新 issue。 + +## 4. 工具支撑 + +- `scripts/todo_report.py`:归集 TODO/FIXME/HACK,支持 JSON 输出利于持续集成。 +- 计划补充: + - 模块耦合分析(import 依赖图) + - 代码所有权(CODEOWNERS 草案) + - 常见安全用法检查脚本 + +## 5. 推进节奏 + +| 周期 | 动作 | +| --- | --- | +| 每周 | 更新 TODO 报告、归档新增 TODO、关闭已完成项 | +| 每两周 | 聚焦一个子系统做深入代码审查 | +| 每月 | 汇总「体检 + TODO + 审查」结果,形成复盘 | + +> 说明:本计划将根据治理结果滚动调整,欢迎在 PR 或 issue 中补充改进建议。 diff --git a/scripts/todo_report.py b/scripts/todo_report.py new file mode 100755 index 0000000..5d8259d --- /dev/null +++ b/scripts/todo_report.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +"""Generate a consolidated report of TODO/FIXME markers in the repository.""" +from __future__ import annotations + +import argparse +import json +import os +import re +import sys +from pathlib import Path +from typing import Iterable, List, Dict, Any + +DEFAULT_PATTERNS = ( + r"\bTODO\b", + r"\bFIXME\b", + r"\bHACK\b", + r"\bXXX\b", +) + +EXCLUDE_DIRS = { + ".git", + ".mypy_cache", + ".pytest_cache", + "__pycache__", + "node_modules", + "build", + "dist", + ".venv", +} + + +def iter_source_files(root: Path, extensions: Iterable[str] | None = None) -> Iterable[Path]: + extensions = set(ext.lower() for ext in (extensions or [])) + for path in root.rglob("*"): + if path.is_dir(): + if path.name in EXCLUDE_DIRS: + # Prevent descending into excluded directories + dirs = list(path.iterdir()) + for child in dirs: + if child.is_dir(): + iter_source_files(child, extensions) + continue + continue + if extensions and path.suffix.lower() not in extensions: + continue + yield path + + +def scan_file(path: Path, patterns: List[re.Pattern[str]]) -> List[Dict[str, Any]]: + issues: List[Dict[str, Any]] = [] + try: + text = path.read_text(encoding="utf-8") + except (UnicodeDecodeError, OSError): + return issues + for idx, line in enumerate(text.splitlines(), start=1): + for pattern in patterns: + match = pattern.search(line) + if match: + issues.append( + { + "file": str(path), + "line": idx, + "tag": match.group(0), + "text": line.strip(), + } + ) + break + return issues + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("path", nargs="?", default=".", help="Root directory to scan.") + parser.add_argument( + "--format", + choices={"table", "json"}, + default="table", + help="Output format (default: table).", + ) + parser.add_argument( + "--ext", + action="append", + default=None, + help="Restrict scan to specific file extensions (e.g., --ext .py --ext .md).", + ) + parser.add_argument( + "--pattern", + action="append", + default=None, + help="Additional regex pattern to match (case-insensitive).", + ) + args = parser.parse_args() + + root = Path(args.path).resolve() + if not root.exists(): + print(f"Path not found: {root}", file=sys.stderr) + return 1 + + patterns = [re.compile(pat, re.IGNORECASE) for pat in DEFAULT_PATTERNS] + if args.pattern: + patterns.extend(re.compile(pat, re.IGNORECASE) for pat in args.pattern) + + issues: List[Dict[str, Any]] = [] + for file_path in iter_source_files(root, args.ext): + issues.extend(scan_file(file_path, patterns)) + + issues.sort(key=lambda item: (item["file"], item["line"])) + + if args.format == "json": + json.dump(issues, sys.stdout, indent=2, ensure_ascii=False) + print() + return 0 + + # Default table output + if not issues: + print("No TODO/FIXME markers found.") + return 0 + + width_file = max(len(item["file"]) for item in issues) + width_tag = max(len(item["tag"]) for item in issues) + header = f"{'File'.ljust(width_file)} {'Line':>5} {'Tag'.ljust(width_tag)} Text" + print(header) + print("-" * len(header)) + for item in issues: + file_display = item["file"].ljust(width_file) + tag_display = item["tag"].ljust(width_tag) + print(f"{file_display} {item['line']:>5} {tag_display} {item['text']}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_factors.py b/tests/test_factors.py index 544c180..912f65f 100644 --- a/tests/test_factors.py +++ b/tests/test_factors.py @@ -12,6 +12,7 @@ from app.features.factors import ( FactorResult, FactorSpec, compute_factor_range, + compute_factors_incremental, compute_factors, _valuation_score, _volume_ratio_score, @@ -183,6 +184,25 @@ def test_compute_factors_skip_existing(isolated_db): assert skipped == [] +def test_compute_factors_incremental(isolated_db): + ts_code = "000001.SZ" + latest_day = date(2025, 2, 10) + _populate_sample_data(ts_code, latest_day) + + first_day = latest_day - timedelta(days=5) + compute_factors(first_day) + + summary = compute_factors_incremental(max_trading_days=3) + trade_dates = summary["trade_dates"] + assert trade_dates + assert trade_dates[0] > first_day + assert summary["count"] > 0 + + # No new dates should return empty result + summary_again = compute_factors_incremental(max_trading_days=3) + assert summary_again["count"] == 0 + + def test_compute_factor_range_filters_universe(isolated_db): code_a = "000001.SZ" code_b = "000002.SZ"