diff --git a/README.md b/README.md index cf028bc..e1a4244 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,16 @@ - **数据覆盖自检**:`app/ingest/tushare.py` 封装 TuShare 拉取、增量更新与覆盖统计,`app/ingest/checker.py` 提供强制补数与窗口化覆盖报告。 - **事件驱动回测**:`app/backtest/engine.py` 构建日频回测循环,将代理决策与投资组合状态解耦,便于扩展成交撮合与绩效统计。 - **可视化与解释**:`app/ui/streamlit_app.py` 提供四大页签(今日计划、回测与复盘、数据与设置、自检测试),结合 Plotly 图形展示和 `app/llm` 提示卡片生成器,支撑人机协作分析。 +- **统一日志与持久化**:SQLite 统一存储行情、回测与日志,配合 `DatabaseLogHandler` 在 UI/抓数流程中输出结构化运行轨迹,支持快速追踪与复盘。 +- **跨市场数据扩展**:`app/ingest/tushare.py` 追加指数、ETF/公募基金、期货、外汇、港股与美股的增量拉取逻辑,确保多资产因子与宏观代理所需的行情基础数据齐备。 + +## LLM + 多智能体最佳实践 + +- **强化结构化特征**:除 A 股行情外,引入资金流、因子、宏观等数据,为六类代理提供更丰富上下文。 +- **场景化 Prompt**:在 `app/llm` 中注入代理贡献、宏观状态和风险事件,让 LLM 输出的策略解释与信号一致。 +- **闭环反馈机制**:将回测或实盘的真实收益、成交等结果写回 SQLite,用于调整代理权重与 Prompt 语料。 +- **多层日志监控**:保留代理评分、决策信心、LLM 提示与 UI 操作日志,帮助定位“谁做的决策、为何失败”。 +- **人机协同流程**:在 UI 呈现代理分歧与 LLM 风险提示,分析师可调权、重跑或复核,实现人在环路的策略流程。 ## 环境依赖与安装 diff --git a/app/data/schema.py b/app/data/schema.py index 7bcbe54..4440d0c 100644 --- a/app/data/schema.py +++ b/app/data/schema.py @@ -102,6 +102,167 @@ SCHEMA_STATEMENTS: Iterable[str] = ( ); """, """ + CREATE TABLE IF NOT EXISTS index_basic ( + ts_code TEXT PRIMARY KEY, + name TEXT, + fullname TEXT, + market TEXT, + publisher TEXT, + index_type TEXT, + category TEXT, + base_date TEXT, + base_point REAL, + list_date TEXT, + weight_rule TEXT, + desc TEXT, + exp_date TEXT + ); + """, + """ + CREATE TABLE IF NOT EXISTS index_daily ( + ts_code TEXT, + trade_date TEXT, + close REAL, + open REAL, + high REAL, + low REAL, + pre_close REAL, + change REAL, + pct_chg REAL, + vol REAL, + amount REAL, + PRIMARY KEY (ts_code, trade_date) + ); + """, + """ + CREATE TABLE IF NOT EXISTS fund_basic ( + ts_code TEXT PRIMARY KEY, + name TEXT, + management TEXT, + custodian TEXT, + fund_type TEXT, + found_date TEXT, + due_date TEXT, + list_date TEXT, + issue_date TEXT, + delist_date TEXT, + issue_amount REAL, + m_fee REAL, + c_fee REAL, + benchmark TEXT, + status TEXT, + invest_type TEXT, + type TEXT, + trustee TEXT, + purc_start_date TEXT, + redm_start_date TEXT, + market TEXT + ); + """, + """ + CREATE TABLE IF NOT EXISTS fund_nav ( + ts_code TEXT, + nav_date TEXT, + ann_date TEXT, + unit_nav REAL, + accum_nav REAL, + accum_div REAL, + net_asset REAL, + total_netasset REAL, + adj_nav REAL, + update_flag TEXT, + PRIMARY KEY (ts_code, nav_date) + ); + """, + """ + CREATE TABLE IF NOT EXISTS fut_basic ( + ts_code TEXT PRIMARY KEY, + symbol TEXT, + name TEXT, + exchange TEXT, + exchange_full_name TEXT, + product TEXT, + product_name TEXT, + variety TEXT, + list_date TEXT, + delist_date TEXT, + trade_unit REAL, + per_unit REAL, + quote_unit TEXT, + settle_month TEXT, + contract_size REAL, + tick_size REAL, + margin_rate REAL, + margin_ratio REAL, + delivery_month TEXT, + delivery_day TEXT + ); + """, + """ + CREATE TABLE IF NOT EXISTS fut_daily ( + ts_code TEXT, + trade_date TEXT, + pre_settle REAL, + open REAL, + high REAL, + low REAL, + close REAL, + settle REAL, + change1 REAL, + change2 REAL, + vol REAL, + amount REAL, + oi REAL, + oi_chg REAL, + PRIMARY KEY (ts_code, trade_date) + ); + """, + """ + CREATE TABLE IF NOT EXISTS fx_daily ( + ts_code TEXT, + trade_date TEXT, + bid REAL, + ask REAL, + mid REAL, + high REAL, + low REAL, + PRIMARY KEY (ts_code, trade_date) + ); + """, + """ + CREATE TABLE IF NOT EXISTS hk_daily ( + ts_code TEXT, + trade_date TEXT, + close REAL, + open REAL, + high REAL, + low REAL, + pre_close REAL, + change REAL, + pct_chg REAL, + vol REAL, + amount REAL, + exchange TEXT, + PRIMARY KEY (ts_code, trade_date) + ); + """, + """ + CREATE TABLE IF NOT EXISTS us_daily ( + ts_code TEXT, + trade_date TEXT, + close REAL, + open REAL, + high REAL, + low REAL, + pre_close REAL, + change REAL, + pct_chg REAL, + vol REAL, + amount REAL, + PRIMARY KEY (ts_code, trade_date) + ); + """, + """ CREATE TABLE IF NOT EXISTS news ( id TEXT PRIMARY KEY, ts_code TEXT, @@ -212,6 +373,15 @@ REQUIRED_TABLES = ( "suspend", "trade_calendar", "stk_limit", + "index_basic", + "index_daily", + "fund_basic", + "fund_nav", + "fut_basic", + "fut_daily", + "fx_daily", + "hk_daily", + "us_daily", "news", "heat_daily", "bt_config", diff --git a/app/ingest/tushare.py b/app/ingest/tushare.py index 814b729..20612ad 100644 --- a/app/ingest/tushare.py +++ b/app/ingest/tushare.py @@ -2,8 +2,9 @@ from __future__ import annotations import os +import sqlite3 import time -from collections import deque +from collections import defaultdict, deque from dataclasses import dataclass from datetime import date from typing import Callable, Dict, Iterable, List, Optional, Sequence, Set, Tuple @@ -27,6 +28,84 @@ API_DEFAULT_LIMIT = 5000 LOG_EXTRA = {"stage": "data_ingest"} _CALL_QUEUE = deque() +_CALL_BUCKETS: Dict[str, deque] = defaultdict(deque) + +API_RATE_LIMITS: Dict[str, int] = { + "stock_basic": 180, + "daily": 480, + "daily_basic": 200, + "adj_factor": 200, + "suspend_d": 180, + "suspend": 180, + "stk_limit": 200, + "trade_cal": 200, + "index_basic": 120, + "index_daily": 240, + "fund_basic": 120, + "fund_nav": 200, + "fut_basic": 120, + "fut_daily": 200, + "fx_daily": 200, + "hk_daily": 2, + "us_daily": 200, +} + + +INDEX_CODES: Tuple[str, ...] = ( + "000001.SH", # 上证综指 + "000300.SH", # 沪深300 + "000016.SH", # 上证50 + "000905.SH", # 中证500 + "399001.SZ", # 深证成指 + "399005.SZ", # 中小板指 + "399006.SZ", # 创业板指 + "HSI.HI", # 恒生指数 + "SPX.GI", # 标普500 + "DJI.GI", # 道琼斯工业指数 + "IXIC.GI", # 纳斯达克综合指数 + "GDAXI.GI", # 德国DAX + "FTSE.GI", # 英国富时100 +) + +ETF_CODES: Tuple[str, ...] = ( + "510300.SH", # 华泰柏瑞沪深300ETF + "510500.SH", # 南方中证500ETF + "159915.SZ", # 易方达创业板ETF +) + +FUND_CODES: Tuple[str, ...] = ( + "000001.OF", # 华夏成长 + "110022.OF", # 易方达消费行业 +) + +FUTURE_CODES: Tuple[str, ...] = ( + "IF9999.CFE", # 沪深300股指期货主力 + "IC9999.CFE", # 中证500股指期货主力 + "IH9999.CFE", # 上证50股指期货主力 +) + +FX_CODES: Tuple[str, ...] = ( + "USDCNY", # 美元人民币 + "EURCNY", # 欧元人民币 +) + +HK_CODES: Tuple[str, ...] = ( + "00700.HK", # 腾讯控股 + "00941.HK", # 中国移动 + "09618.HK", # 京东集团-SW + "09988.HK", # 阿里巴巴-SW + "03690.HK", # 美团-W +) + +US_CODES: Tuple[str, ...] = ( + "AAPL.O", # 苹果 + "MSFT.O", # 微软 + "BABA.N", # 阿里巴巴美股 + "JD.O", # 京东美股 + "PDD.O", # 拼多多 + "BIDU.O", # 百度 + "BILI.O", # 哔哩哔哩 +) def _normalize_date_str(value: Optional[str]) -> Optional[str]: @@ -36,19 +115,33 @@ def _normalize_date_str(value: Optional[str]) -> Optional[str]: return text or None -def _respect_rate_limit(cfg) -> None: +def _respect_rate_limit(endpoint: str | None, cfg) -> None: + def _throttle(queue: deque, limit: int) -> None: + if limit <= 0: + return + now = time.time() + window = 60.0 + while queue and now - queue[0] > window: + queue.popleft() + if len(queue) >= limit: + sleep_time = window - (now - queue[0]) + 0.1 + LOGGER.debug( + "触发限频控制(limit=%s)休眠 %.2f 秒 endpoint=%s", + limit, + sleep_time, + endpoint, + extra=LOG_EXTRA, + ) + time.sleep(max(0.1, sleep_time)) + queue.append(time.time()) + max_calls = cfg.max_calls_per_minute - if max_calls <= 0: - return - now = time.time() - window = 60.0 - while _CALL_QUEUE and now - _CALL_QUEUE[0] > window: - _CALL_QUEUE.popleft() - if len(_CALL_QUEUE) >= max_calls: - sleep_time = window - (now - _CALL_QUEUE[0]) + 0.1 - LOGGER.debug("触发限频控制,休眠 %.2f 秒", sleep_time, extra=LOG_EXTRA) - time.sleep(max(0.1, sleep_time)) - _CALL_QUEUE.append(time.time()) + if max_calls > 0: + _throttle(_CALL_QUEUE, max_calls) + + bucket_key = endpoint or "_default" + endpoint_limit = API_RATE_LIMITS.get(bucket_key, max_calls) + _throttle(_CALL_BUCKETS[bucket_key], endpoint_limit or 0) def _df_to_records(df: pd.DataFrame, allowed_cols: List[str]) -> List[Dict]: @@ -72,7 +165,7 @@ def _fetch_paginated(endpoint: str, params: Dict[str, object], limit: int | None extra=LOG_EXTRA, ) while True: - _respect_rate_limit(get_config()) + _respect_rate_limit(endpoint, get_config()) call = getattr(client, endpoint) try: df = call(limit=limit, offset=offset, **clean_params) @@ -218,6 +311,167 @@ _TABLE_SCHEMAS: Dict[str, str] = { PRIMARY KEY (ts_code, trade_date) ); """, + "index_basic": """ + CREATE TABLE IF NOT EXISTS index_basic ( + ts_code TEXT PRIMARY KEY, + name TEXT, + fullname TEXT, + market TEXT, + publisher TEXT, + index_type TEXT, + category TEXT, + base_date TEXT, + base_point REAL, + list_date TEXT, + weight_rule TEXT, + desc TEXT, + exp_date TEXT + ); + """, + "index_daily": """ + CREATE TABLE IF NOT EXISTS index_daily ( + ts_code TEXT, + trade_date TEXT, + close REAL, + open REAL, + high REAL, + low REAL, + pre_close REAL, + change REAL, + pct_chg REAL, + vol REAL, + amount REAL, + PRIMARY KEY (ts_code, trade_date) + ); + """, + "fund_basic": """ + CREATE TABLE IF NOT EXISTS fund_basic ( + ts_code TEXT PRIMARY KEY, + name TEXT, + management TEXT, + custodian TEXT, + fund_type TEXT, + found_date TEXT, + due_date TEXT, + list_date TEXT, + issue_date TEXT, + delist_date TEXT, + issue_amount REAL, + m_fee REAL, + c_fee REAL, + benchmark TEXT, + status TEXT, + invest_type TEXT, + type TEXT, + trustee TEXT, + purc_start_date TEXT, + redm_start_date TEXT, + market TEXT + ); + """, + "fund_nav": """ + CREATE TABLE IF NOT EXISTS fund_nav ( + ts_code TEXT, + nav_date TEXT, + ann_date TEXT, + unit_nav REAL, + accum_nav REAL, + accum_div REAL, + net_asset REAL, + total_netasset REAL, + adj_nav REAL, + update_flag TEXT, + PRIMARY KEY (ts_code, nav_date) + ); + """, + "fut_basic": """ + CREATE TABLE IF NOT EXISTS fut_basic ( + ts_code TEXT PRIMARY KEY, + symbol TEXT, + name TEXT, + exchange TEXT, + exchange_full_name TEXT, + product TEXT, + product_name TEXT, + variety TEXT, + list_date TEXT, + delist_date TEXT, + trade_unit REAL, + per_unit REAL, + quote_unit TEXT, + settle_month TEXT, + contract_size REAL, + tick_size REAL, + margin_rate REAL, + margin_ratio REAL, + delivery_month TEXT, + delivery_day TEXT + ); + """, + "fut_daily": """ + CREATE TABLE IF NOT EXISTS fut_daily ( + ts_code TEXT, + trade_date TEXT, + pre_settle REAL, + open REAL, + high REAL, + low REAL, + close REAL, + settle REAL, + change1 REAL, + change2 REAL, + vol REAL, + amount REAL, + oi REAL, + oi_chg REAL, + PRIMARY KEY (ts_code, trade_date) + ); + """, + "fx_daily": """ + CREATE TABLE IF NOT EXISTS fx_daily ( + ts_code TEXT, + trade_date TEXT, + bid REAL, + ask REAL, + mid REAL, + high REAL, + low REAL, + PRIMARY KEY (ts_code, trade_date) + ); + """, + "hk_daily": """ + CREATE TABLE IF NOT EXISTS hk_daily ( + ts_code TEXT, + trade_date TEXT, + close REAL, + open REAL, + high REAL, + low REAL, + pre_close REAL, + change REAL, + pct_chg REAL, + vol REAL, + amount REAL, + exchange TEXT, + PRIMARY KEY (ts_code, trade_date) + ); + """, + "us_daily": """ + CREATE TABLE IF NOT EXISTS us_daily ( + ts_code TEXT, + trade_date TEXT, + close REAL, + open REAL, + high REAL, + low REAL, + pre_close REAL, + change REAL, + pct_chg REAL, + vol REAL, + amount REAL, + PRIMARY KEY (ts_code, trade_date) + ); + """, } _TABLE_COLUMNS: Dict[str, List[str]] = { @@ -293,6 +547,143 @@ _TABLE_COLUMNS: Dict[str, List[str]] = { "up_limit", "down_limit", ], + "index_basic": [ + "ts_code", + "name", + "fullname", + "market", + "publisher", + "index_type", + "category", + "base_date", + "base_point", + "list_date", + "weight_rule", + "desc", + "exp_date", + ], + "index_daily": [ + "ts_code", + "trade_date", + "close", + "open", + "high", + "low", + "pre_close", + "change", + "pct_chg", + "vol", + "amount", + ], + "fund_basic": [ + "ts_code", + "name", + "management", + "custodian", + "fund_type", + "found_date", + "due_date", + "list_date", + "issue_date", + "delist_date", + "issue_amount", + "m_fee", + "c_fee", + "benchmark", + "status", + "invest_type", + "type", + "trustee", + "purc_start_date", + "redm_start_date", + "market", + ], + "fund_nav": [ + "ts_code", + "nav_date", + "ann_date", + "unit_nav", + "accum_nav", + "accum_div", + "net_asset", + "total_netasset", + "adj_nav", + "update_flag", + ], + "fut_basic": [ + "ts_code", + "symbol", + "name", + "exchange", + "exchange_full_name", + "product", + "product_name", + "variety", + "list_date", + "delist_date", + "trade_unit", + "per_unit", + "quote_unit", + "settle_month", + "contract_size", + "tick_size", + "margin_rate", + "margin_ratio", + "delivery_month", + "delivery_day", + ], + "fut_daily": [ + "ts_code", + "trade_date", + "pre_settle", + "open", + "high", + "low", + "close", + "settle", + "change1", + "change2", + "vol", + "amount", + "oi", + "oi_chg", + ], + "fx_daily": [ + "ts_code", + "trade_date", + "bid", + "ask", + "mid", + "high", + "low", + ], + "hk_daily": [ + "ts_code", + "trade_date", + "close", + "open", + "high", + "low", + "pre_close", + "change", + "pct_chg", + "vol", + "amount", + "exchange", + ], + "us_daily": [ + "ts_code", + "trade_date", + "close", + "open", + "high", + "low", + "pre_close", + "change", + "pct_chg", + "vol", + "amount", + ], } @@ -394,8 +785,11 @@ def _range_stats( if ts_code: sql += " AND ts_code = ?" params.append(ts_code) - with db_session(read_only=True) as conn: - row = conn.execute(sql, tuple(params)).fetchone() + try: + with db_session(read_only=True) as conn: + row = conn.execute(sql, tuple(params)).fetchone() + except sqlite3.OperationalError: + return {"min": None, "max": None, "distinct": 0} return { "min": row["min_d"] if row else None, "max": row["max_d"] if row else None, @@ -426,8 +820,11 @@ def _existing_suspend_dates(start_str: str, end_str: str, ts_code: str | None = if ts_code: sql += " AND ts_code = ?" params.append(ts_code) - with db_session(read_only=True) as conn: - rows = conn.execute(sql, tuple(params)).fetchall() + try: + with db_session(read_only=True) as conn: + rows = conn.execute(sql, tuple(params)).fetchall() + except sqlite3.OperationalError: + return set() return {row["suspend_date"] for row in rows if row["suspend_date"]} @@ -477,6 +874,7 @@ def fetch_stock_basic(exchange: Optional[str] = None, list_status: str = "L") -> list_status, extra=LOG_EXTRA, ) + _respect_rate_limit("stock_basic", get_config()) fields = "ts_code,symbol,name,area,industry,market,exchange,list_status,list_date,delist_date" df = client.stock_basic(exchange=exchange, list_status=list_status, fields=fields) return _df_to_records(df, _TABLE_COLUMNS["stock_basic"]) @@ -715,6 +1113,7 @@ def fetch_trade_calendar(start: date, end: date, exchange: str = "SSE") -> Itera end_date, extra=LOG_EXTRA, ) + _respect_rate_limit("trade_cal", get_config()) df = client.trade_cal(exchange=exchange, start_date=start_date, end_date=end_date) if df is not None and not df.empty and "is_open" in df.columns: df["is_open"] = pd.to_numeric(df["is_open"], errors="coerce").fillna(0).astype(int) @@ -757,6 +1156,144 @@ def fetch_stk_limit( return _df_to_records(merged, _TABLE_COLUMNS["stk_limit"]) +def fetch_index_basic(market: Optional[str] = None) -> Iterable[Dict]: + client = _ensure_client() + LOGGER.info("拉取指数基础信息(market=%s)", market or "all", extra=LOG_EXTRA) + _respect_rate_limit("index_basic", get_config()) + df = client.index_basic(market=market) + return _df_to_records(df, _TABLE_COLUMNS["index_basic"]) + + +def fetch_index_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]: + client = _ensure_client() + start_str = _format_date(start) + end_str = _format_date(end) + LOGGER.info( + "拉取指数日线:%s %s-%s", + ts_code, + start_str, + end_str, + extra=LOG_EXTRA, + ) + df = _fetch_paginated( + "index_daily", + {"ts_code": ts_code, "start_date": start_str, "end_date": end_str}, + limit=5000, + ) + return _df_to_records(df, _TABLE_COLUMNS["index_daily"]) + + +def fetch_fund_basic(asset_class: str = "E", status: str = "L") -> Iterable[Dict]: + client = _ensure_client() + LOGGER.info("拉取基金基础信息:asset_class=%s status=%s", asset_class, status, extra=LOG_EXTRA) + _respect_rate_limit("fund_basic", get_config()) + df = client.fund_basic(market=asset_class, status=status) + return _df_to_records(df, _TABLE_COLUMNS["fund_basic"]) + + +def fetch_fund_nav(start: date, end: date, ts_code: str) -> Iterable[Dict]: + client = _ensure_client() + start_str = _format_date(start) + end_str = _format_date(end) + LOGGER.info( + "拉取基金净值:%s %s-%s", + ts_code, + start_str, + end_str, + extra=LOG_EXTRA, + ) + df = _fetch_paginated( + "fund_nav", + {"ts_code": ts_code, "start_date": start_str, "end_date": end_str}, + limit=5000, + ) + return _df_to_records(df, _TABLE_COLUMNS["fund_nav"]) + + +def fetch_fut_basic(exchange: Optional[str] = None) -> Iterable[Dict]: + client = _ensure_client() + LOGGER.info("拉取期货基础信息(exchange=%s)", exchange or "all", extra=LOG_EXTRA) + _respect_rate_limit("fut_basic", get_config()) + df = client.fut_basic(exchange=exchange) + return _df_to_records(df, _TABLE_COLUMNS["fut_basic"]) + + +def fetch_fut_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]: + client = _ensure_client() + start_str = _format_date(start) + end_str = _format_date(end) + LOGGER.info( + "拉取期货日线:%s %s-%s", + ts_code, + start_str, + end_str, + extra=LOG_EXTRA, + ) + df = _fetch_paginated( + "fut_daily", + {"ts_code": ts_code, "start_date": start_str, "end_date": end_str}, + limit=4000, + ) + return _df_to_records(df, _TABLE_COLUMNS["fut_daily"]) + + +def fetch_fx_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]: + client = _ensure_client() + start_str = _format_date(start) + end_str = _format_date(end) + LOGGER.info( + "拉取外汇日线:%s %s-%s", + ts_code, + start_str, + end_str, + extra=LOG_EXTRA, + ) + df = _fetch_paginated( + "fx_daily", + {"ts_code": ts_code, "start_date": start_str, "end_date": end_str}, + limit=4000, + ) + return _df_to_records(df, _TABLE_COLUMNS["fx_daily"]) + + +def fetch_hk_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]: + client = _ensure_client() + start_str = _format_date(start) + end_str = _format_date(end) + LOGGER.info( + "拉取港股日线:%s %s-%s", + ts_code, + start_str, + end_str, + extra=LOG_EXTRA, + ) + df = _fetch_paginated( + "hk_daily", + {"ts_code": ts_code, "start_date": start_str, "end_date": end_str}, + limit=4000, + ) + return _df_to_records(df, _TABLE_COLUMNS["hk_daily"]) + + +def fetch_us_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]: + client = _ensure_client() + start_str = _format_date(start) + end_str = _format_date(end) + LOGGER.info( + "拉取美股日线:%s %s-%s", + ts_code, + start_str, + end_str, + extra=LOG_EXTRA, + ) + df = _fetch_paginated( + "us_daily", + {"ts_code": ts_code, "start_date": start_str, "end_date": end_str}, + limit=4000, + ) + return _df_to_records(df, _TABLE_COLUMNS["us_daily"]) + + def save_records(table: str, rows: Iterable[Dict]) -> None: items = list(rows) if not items: @@ -812,6 +1349,7 @@ def ensure_data_coverage( end: date, ts_codes: Optional[Sequence[str]] = None, include_limits: bool = True, + include_extended: bool = True, force: bool = False, progress_hook: Callable[[str, float], None] | None = None, ) -> None: @@ -819,7 +1357,12 @@ def ensure_data_coverage( start_str = _format_date(start) end_str = _format_date(end) - total_steps = 5 + (1 if include_limits else 0) + extra_steps = 0 + if include_limits: + extra_steps += 1 + if include_extended: + extra_steps += 4 + total_steps = 5 + extra_steps current_step = 0 def advance(message: str) -> None: @@ -864,6 +1407,16 @@ def ensure_data_coverage( "stk_limit": "trade_date", "suspend": "suspend_date", } + date_cols.update( + { + "index_daily": "trade_date", + "fund_nav": "nav_date", + "fut_daily": "trade_date", + "fx_daily": "trade_date", + "hk_daily": "trade_date", + "us_daily": "trade_date", + } + ) def _save_with_codes(table: str, fetch_fn) -> None: date_col = date_cols.get(table, "trade_date") @@ -914,6 +1467,79 @@ def ensure_data_coverage( advance("处理停复牌信息") _save_with_codes("suspend", fetch_suspensions) + if include_extended: + advance("同步指数/基金/期货基础信息") + try: + save_records("index_basic", fetch_index_basic()) + save_records("fund_basic", fetch_fund_basic()) + save_records("fut_basic", fetch_fut_basic()) + except Exception: + LOGGER.exception("扩展基础信息拉取失败") + raise + + advance("拉取指数行情数据") + for code in INDEX_CODES: + try: + if not force and _should_skip_range("index_daily", "trade_date", start, end, code): + LOGGER.info("指数 %s 已覆盖 %s-%s,跳过", code, start_str, end_str) + continue + save_records("index_daily", fetch_index_daily(start, end, code)) + except Exception: + LOGGER.exception("指数行情拉取失败:%s", code) + raise + + advance("拉取基金净值数据") + fund_targets = tuple(dict.fromkeys(ETF_CODES + FUND_CODES)) + for code in fund_targets: + try: + if not force and _should_skip_range("fund_nav", "nav_date", start, end, code): + LOGGER.info("基金 %s 净值已覆盖 %s-%s,跳过", code, start_str, end_str) + continue + save_records("fund_nav", fetch_fund_nav(start, end, code)) + except Exception: + LOGGER.exception("基金净值拉取失败:%s", code) + raise + + advance("拉取期货/外汇行情数据") + for code in FUTURE_CODES: + try: + if not force and _should_skip_range("fut_daily", "trade_date", start, end, code): + LOGGER.info("期货 %s 已覆盖 %s-%s,跳过", code, start_str, end_str) + continue + save_records("fut_daily", fetch_fut_daily(start, end, code)) + except Exception: + LOGGER.exception("期货行情拉取失败:%s", code) + raise + for code in FX_CODES: + try: + if not force and _should_skip_range("fx_daily", "trade_date", start, end, code): + LOGGER.info("外汇 %s 已覆盖 %s-%s,跳过", code, start_str, end_str) + continue + save_records("fx_daily", fetch_fx_daily(start, end, code)) + except Exception: + LOGGER.exception("外汇行情拉取失败:%s", code) + raise + + advance("拉取港/美股行情数据") + for code in HK_CODES: + try: + if not force and _should_skip_range("hk_daily", "trade_date", start, end, code): + LOGGER.info("港股 %s 已覆盖 %s-%s,跳过", code, start_str, end_str) + continue + save_records("hk_daily", fetch_hk_daily(start, end, code)) + except Exception: + LOGGER.exception("港股行情拉取失败:%s", code) + raise + for code in US_CODES: + try: + if not force and _should_skip_range("us_daily", "trade_date", start, end, code): + LOGGER.info("美股 %s 已覆盖 %s-%s,跳过", code, start_str, end_str) + continue + save_records("us_daily", fetch_us_daily(start, end, code)) + except Exception: + LOGGER.exception("美股行情拉取失败:%s", code) + raise + if progress_hook: progress_hook("数据覆盖检查完成", 1.0) @@ -951,6 +1577,12 @@ def collect_data_coverage(start: date, end: date) -> Dict[str, Dict[str, object] add_table("adj_factor", "trade_date") add_table("stk_limit", "trade_date") add_table("suspend", "suspend_date", require_days=False) + add_table("index_daily", "trade_date") + add_table("fund_nav", "nav_date", require_days=False) + add_table("fut_daily", "trade_date", require_days=False) + add_table("fx_daily", "trade_date", require_days=False) + add_table("hk_daily", "trade_date", require_days=False) + add_table("us_daily", "trade_date", require_days=False) with db_session(read_only=True) as conn: stock_tot = conn.execute("SELECT COUNT(*) AS cnt FROM stock_basic").fetchone() @@ -976,6 +1608,7 @@ def run_ingestion(job: FetchJob, include_limits: bool = True) -> None: job.end, ts_codes=job.ts_codes, include_limits=include_limits, + include_extended=True, force=True, ) LOGGER.info("任务 %s 完成", job.name, extra=LOG_EXTRA)