This commit is contained in:
sam 2025-09-27 18:39:16 +08:00
parent 15a50cad93
commit b9a359d501
3 changed files with 832 additions and 19 deletions

View File

@ -21,6 +21,16 @@
- **数据覆盖自检**`app/ingest/tushare.py` 封装 TuShare 拉取、增量更新与覆盖统计,`app/ingest/checker.py` 提供强制补数与窗口化覆盖报告。
- **事件驱动回测**`app/backtest/engine.py` 构建日频回测循环,将代理决策与投资组合状态解耦,便于扩展成交撮合与绩效统计。
- **可视化与解释**`app/ui/streamlit_app.py` 提供四大页签(今日计划、回测与复盘、数据与设置、自检测试),结合 Plotly 图形展示和 `app/llm` 提示卡片生成器,支撑人机协作分析。
- **统一日志与持久化**SQLite 统一存储行情、回测与日志,配合 `DatabaseLogHandler` 在 UI/抓数流程中输出结构化运行轨迹,支持快速追踪与复盘。
- **跨市场数据扩展**`app/ingest/tushare.py` 追加指数、ETF/公募基金、期货、外汇、港股与美股的增量拉取逻辑,确保多资产因子与宏观代理所需的行情基础数据齐备。
## LLM + 多智能体最佳实践
- **强化结构化特征**:除 A 股行情外,引入资金流、因子、宏观等数据,为六类代理提供更丰富上下文。
- **场景化 Prompt**:在 `app/llm` 中注入代理贡献、宏观状态和风险事件,让 LLM 输出的策略解释与信号一致。
- **闭环反馈机制**:将回测或实盘的真实收益、成交等结果写回 SQLite用于调整代理权重与 Prompt 语料。
- **多层日志监控**保留代理评分、决策信心、LLM 提示与 UI 操作日志,帮助定位“谁做的决策、为何失败”。
- **人机协同流程**:在 UI 呈现代理分歧与 LLM 风险提示,分析师可调权、重跑或复核,实现人在环路的策略流程。
## 环境依赖与安装

View File

@ -102,6 +102,167 @@ SCHEMA_STATEMENTS: Iterable[str] = (
);
""",
"""
CREATE TABLE IF NOT EXISTS index_basic (
ts_code TEXT PRIMARY KEY,
name TEXT,
fullname TEXT,
market TEXT,
publisher TEXT,
index_type TEXT,
category TEXT,
base_date TEXT,
base_point REAL,
list_date TEXT,
weight_rule TEXT,
desc TEXT,
exp_date TEXT
);
""",
"""
CREATE TABLE IF NOT EXISTS index_daily (
ts_code TEXT,
trade_date TEXT,
close REAL,
open REAL,
high REAL,
low REAL,
pre_close REAL,
change REAL,
pct_chg REAL,
vol REAL,
amount REAL,
PRIMARY KEY (ts_code, trade_date)
);
""",
"""
CREATE TABLE IF NOT EXISTS fund_basic (
ts_code TEXT PRIMARY KEY,
name TEXT,
management TEXT,
custodian TEXT,
fund_type TEXT,
found_date TEXT,
due_date TEXT,
list_date TEXT,
issue_date TEXT,
delist_date TEXT,
issue_amount REAL,
m_fee REAL,
c_fee REAL,
benchmark TEXT,
status TEXT,
invest_type TEXT,
type TEXT,
trustee TEXT,
purc_start_date TEXT,
redm_start_date TEXT,
market TEXT
);
""",
"""
CREATE TABLE IF NOT EXISTS fund_nav (
ts_code TEXT,
nav_date TEXT,
ann_date TEXT,
unit_nav REAL,
accum_nav REAL,
accum_div REAL,
net_asset REAL,
total_netasset REAL,
adj_nav REAL,
update_flag TEXT,
PRIMARY KEY (ts_code, nav_date)
);
""",
"""
CREATE TABLE IF NOT EXISTS fut_basic (
ts_code TEXT PRIMARY KEY,
symbol TEXT,
name TEXT,
exchange TEXT,
exchange_full_name TEXT,
product TEXT,
product_name TEXT,
variety TEXT,
list_date TEXT,
delist_date TEXT,
trade_unit REAL,
per_unit REAL,
quote_unit TEXT,
settle_month TEXT,
contract_size REAL,
tick_size REAL,
margin_rate REAL,
margin_ratio REAL,
delivery_month TEXT,
delivery_day TEXT
);
""",
"""
CREATE TABLE IF NOT EXISTS fut_daily (
ts_code TEXT,
trade_date TEXT,
pre_settle REAL,
open REAL,
high REAL,
low REAL,
close REAL,
settle REAL,
change1 REAL,
change2 REAL,
vol REAL,
amount REAL,
oi REAL,
oi_chg REAL,
PRIMARY KEY (ts_code, trade_date)
);
""",
"""
CREATE TABLE IF NOT EXISTS fx_daily (
ts_code TEXT,
trade_date TEXT,
bid REAL,
ask REAL,
mid REAL,
high REAL,
low REAL,
PRIMARY KEY (ts_code, trade_date)
);
""",
"""
CREATE TABLE IF NOT EXISTS hk_daily (
ts_code TEXT,
trade_date TEXT,
close REAL,
open REAL,
high REAL,
low REAL,
pre_close REAL,
change REAL,
pct_chg REAL,
vol REAL,
amount REAL,
exchange TEXT,
PRIMARY KEY (ts_code, trade_date)
);
""",
"""
CREATE TABLE IF NOT EXISTS us_daily (
ts_code TEXT,
trade_date TEXT,
close REAL,
open REAL,
high REAL,
low REAL,
pre_close REAL,
change REAL,
pct_chg REAL,
vol REAL,
amount REAL,
PRIMARY KEY (ts_code, trade_date)
);
""",
"""
CREATE TABLE IF NOT EXISTS news (
id TEXT PRIMARY KEY,
ts_code TEXT,
@ -212,6 +373,15 @@ REQUIRED_TABLES = (
"suspend",
"trade_calendar",
"stk_limit",
"index_basic",
"index_daily",
"fund_basic",
"fund_nav",
"fut_basic",
"fut_daily",
"fx_daily",
"hk_daily",
"us_daily",
"news",
"heat_daily",
"bt_config",

View File

@ -2,8 +2,9 @@
from __future__ import annotations
import os
import sqlite3
import time
from collections import deque
from collections import defaultdict, deque
from dataclasses import dataclass
from datetime import date
from typing import Callable, Dict, Iterable, List, Optional, Sequence, Set, Tuple
@ -27,6 +28,84 @@ API_DEFAULT_LIMIT = 5000
LOG_EXTRA = {"stage": "data_ingest"}
_CALL_QUEUE = deque()
_CALL_BUCKETS: Dict[str, deque] = defaultdict(deque)
API_RATE_LIMITS: Dict[str, int] = {
"stock_basic": 180,
"daily": 480,
"daily_basic": 200,
"adj_factor": 200,
"suspend_d": 180,
"suspend": 180,
"stk_limit": 200,
"trade_cal": 200,
"index_basic": 120,
"index_daily": 240,
"fund_basic": 120,
"fund_nav": 200,
"fut_basic": 120,
"fut_daily": 200,
"fx_daily": 200,
"hk_daily": 2,
"us_daily": 200,
}
INDEX_CODES: Tuple[str, ...] = (
"000001.SH", # 上证综指
"000300.SH", # 沪深300
"000016.SH", # 上证50
"000905.SH", # 中证500
"399001.SZ", # 深证成指
"399005.SZ", # 中小板指
"399006.SZ", # 创业板指
"HSI.HI", # 恒生指数
"SPX.GI", # 标普500
"DJI.GI", # 道琼斯工业指数
"IXIC.GI", # 纳斯达克综合指数
"GDAXI.GI", # 德国DAX
"FTSE.GI", # 英国富时100
)
ETF_CODES: Tuple[str, ...] = (
"510300.SH", # 华泰柏瑞沪深300ETF
"510500.SH", # 南方中证500ETF
"159915.SZ", # 易方达创业板ETF
)
FUND_CODES: Tuple[str, ...] = (
"000001.OF", # 华夏成长
"110022.OF", # 易方达消费行业
)
FUTURE_CODES: Tuple[str, ...] = (
"IF9999.CFE", # 沪深300股指期货主力
"IC9999.CFE", # 中证500股指期货主力
"IH9999.CFE", # 上证50股指期货主力
)
FX_CODES: Tuple[str, ...] = (
"USDCNY", # 美元人民币
"EURCNY", # 欧元人民币
)
HK_CODES: Tuple[str, ...] = (
"00700.HK", # 腾讯控股
"00941.HK", # 中国移动
"09618.HK", # 京东集团-SW
"09988.HK", # 阿里巴巴-SW
"03690.HK", # 美团-W
)
US_CODES: Tuple[str, ...] = (
"AAPL.O", # 苹果
"MSFT.O", # 微软
"BABA.N", # 阿里巴巴美股
"JD.O", # 京东美股
"PDD.O", # 拼多多
"BIDU.O", # 百度
"BILI.O", # 哔哩哔哩
)
def _normalize_date_str(value: Optional[str]) -> Optional[str]:
@ -36,19 +115,33 @@ def _normalize_date_str(value: Optional[str]) -> Optional[str]:
return text or None
def _respect_rate_limit(cfg) -> None:
max_calls = cfg.max_calls_per_minute
if max_calls <= 0:
def _respect_rate_limit(endpoint: str | None, cfg) -> None:
def _throttle(queue: deque, limit: int) -> None:
if limit <= 0:
return
now = time.time()
window = 60.0
while _CALL_QUEUE and now - _CALL_QUEUE[0] > window:
_CALL_QUEUE.popleft()
if len(_CALL_QUEUE) >= max_calls:
sleep_time = window - (now - _CALL_QUEUE[0]) + 0.1
LOGGER.debug("触发限频控制,休眠 %.2f", sleep_time, extra=LOG_EXTRA)
while queue and now - queue[0] > window:
queue.popleft()
if len(queue) >= limit:
sleep_time = window - (now - queue[0]) + 0.1
LOGGER.debug(
"触发限频控制limit=%s)休眠 %.2f 秒 endpoint=%s",
limit,
sleep_time,
endpoint,
extra=LOG_EXTRA,
)
time.sleep(max(0.1, sleep_time))
_CALL_QUEUE.append(time.time())
queue.append(time.time())
max_calls = cfg.max_calls_per_minute
if max_calls > 0:
_throttle(_CALL_QUEUE, max_calls)
bucket_key = endpoint or "_default"
endpoint_limit = API_RATE_LIMITS.get(bucket_key, max_calls)
_throttle(_CALL_BUCKETS[bucket_key], endpoint_limit or 0)
def _df_to_records(df: pd.DataFrame, allowed_cols: List[str]) -> List[Dict]:
@ -72,7 +165,7 @@ def _fetch_paginated(endpoint: str, params: Dict[str, object], limit: int | None
extra=LOG_EXTRA,
)
while True:
_respect_rate_limit(get_config())
_respect_rate_limit(endpoint, get_config())
call = getattr(client, endpoint)
try:
df = call(limit=limit, offset=offset, **clean_params)
@ -218,6 +311,167 @@ _TABLE_SCHEMAS: Dict[str, str] = {
PRIMARY KEY (ts_code, trade_date)
);
""",
"index_basic": """
CREATE TABLE IF NOT EXISTS index_basic (
ts_code TEXT PRIMARY KEY,
name TEXT,
fullname TEXT,
market TEXT,
publisher TEXT,
index_type TEXT,
category TEXT,
base_date TEXT,
base_point REAL,
list_date TEXT,
weight_rule TEXT,
desc TEXT,
exp_date TEXT
);
""",
"index_daily": """
CREATE TABLE IF NOT EXISTS index_daily (
ts_code TEXT,
trade_date TEXT,
close REAL,
open REAL,
high REAL,
low REAL,
pre_close REAL,
change REAL,
pct_chg REAL,
vol REAL,
amount REAL,
PRIMARY KEY (ts_code, trade_date)
);
""",
"fund_basic": """
CREATE TABLE IF NOT EXISTS fund_basic (
ts_code TEXT PRIMARY KEY,
name TEXT,
management TEXT,
custodian TEXT,
fund_type TEXT,
found_date TEXT,
due_date TEXT,
list_date TEXT,
issue_date TEXT,
delist_date TEXT,
issue_amount REAL,
m_fee REAL,
c_fee REAL,
benchmark TEXT,
status TEXT,
invest_type TEXT,
type TEXT,
trustee TEXT,
purc_start_date TEXT,
redm_start_date TEXT,
market TEXT
);
""",
"fund_nav": """
CREATE TABLE IF NOT EXISTS fund_nav (
ts_code TEXT,
nav_date TEXT,
ann_date TEXT,
unit_nav REAL,
accum_nav REAL,
accum_div REAL,
net_asset REAL,
total_netasset REAL,
adj_nav REAL,
update_flag TEXT,
PRIMARY KEY (ts_code, nav_date)
);
""",
"fut_basic": """
CREATE TABLE IF NOT EXISTS fut_basic (
ts_code TEXT PRIMARY KEY,
symbol TEXT,
name TEXT,
exchange TEXT,
exchange_full_name TEXT,
product TEXT,
product_name TEXT,
variety TEXT,
list_date TEXT,
delist_date TEXT,
trade_unit REAL,
per_unit REAL,
quote_unit TEXT,
settle_month TEXT,
contract_size REAL,
tick_size REAL,
margin_rate REAL,
margin_ratio REAL,
delivery_month TEXT,
delivery_day TEXT
);
""",
"fut_daily": """
CREATE TABLE IF NOT EXISTS fut_daily (
ts_code TEXT,
trade_date TEXT,
pre_settle REAL,
open REAL,
high REAL,
low REAL,
close REAL,
settle REAL,
change1 REAL,
change2 REAL,
vol REAL,
amount REAL,
oi REAL,
oi_chg REAL,
PRIMARY KEY (ts_code, trade_date)
);
""",
"fx_daily": """
CREATE TABLE IF NOT EXISTS fx_daily (
ts_code TEXT,
trade_date TEXT,
bid REAL,
ask REAL,
mid REAL,
high REAL,
low REAL,
PRIMARY KEY (ts_code, trade_date)
);
""",
"hk_daily": """
CREATE TABLE IF NOT EXISTS hk_daily (
ts_code TEXT,
trade_date TEXT,
close REAL,
open REAL,
high REAL,
low REAL,
pre_close REAL,
change REAL,
pct_chg REAL,
vol REAL,
amount REAL,
exchange TEXT,
PRIMARY KEY (ts_code, trade_date)
);
""",
"us_daily": """
CREATE TABLE IF NOT EXISTS us_daily (
ts_code TEXT,
trade_date TEXT,
close REAL,
open REAL,
high REAL,
low REAL,
pre_close REAL,
change REAL,
pct_chg REAL,
vol REAL,
amount REAL,
PRIMARY KEY (ts_code, trade_date)
);
""",
}
_TABLE_COLUMNS: Dict[str, List[str]] = {
@ -293,6 +547,143 @@ _TABLE_COLUMNS: Dict[str, List[str]] = {
"up_limit",
"down_limit",
],
"index_basic": [
"ts_code",
"name",
"fullname",
"market",
"publisher",
"index_type",
"category",
"base_date",
"base_point",
"list_date",
"weight_rule",
"desc",
"exp_date",
],
"index_daily": [
"ts_code",
"trade_date",
"close",
"open",
"high",
"low",
"pre_close",
"change",
"pct_chg",
"vol",
"amount",
],
"fund_basic": [
"ts_code",
"name",
"management",
"custodian",
"fund_type",
"found_date",
"due_date",
"list_date",
"issue_date",
"delist_date",
"issue_amount",
"m_fee",
"c_fee",
"benchmark",
"status",
"invest_type",
"type",
"trustee",
"purc_start_date",
"redm_start_date",
"market",
],
"fund_nav": [
"ts_code",
"nav_date",
"ann_date",
"unit_nav",
"accum_nav",
"accum_div",
"net_asset",
"total_netasset",
"adj_nav",
"update_flag",
],
"fut_basic": [
"ts_code",
"symbol",
"name",
"exchange",
"exchange_full_name",
"product",
"product_name",
"variety",
"list_date",
"delist_date",
"trade_unit",
"per_unit",
"quote_unit",
"settle_month",
"contract_size",
"tick_size",
"margin_rate",
"margin_ratio",
"delivery_month",
"delivery_day",
],
"fut_daily": [
"ts_code",
"trade_date",
"pre_settle",
"open",
"high",
"low",
"close",
"settle",
"change1",
"change2",
"vol",
"amount",
"oi",
"oi_chg",
],
"fx_daily": [
"ts_code",
"trade_date",
"bid",
"ask",
"mid",
"high",
"low",
],
"hk_daily": [
"ts_code",
"trade_date",
"close",
"open",
"high",
"low",
"pre_close",
"change",
"pct_chg",
"vol",
"amount",
"exchange",
],
"us_daily": [
"ts_code",
"trade_date",
"close",
"open",
"high",
"low",
"pre_close",
"change",
"pct_chg",
"vol",
"amount",
],
}
@ -394,8 +785,11 @@ def _range_stats(
if ts_code:
sql += " AND ts_code = ?"
params.append(ts_code)
try:
with db_session(read_only=True) as conn:
row = conn.execute(sql, tuple(params)).fetchone()
except sqlite3.OperationalError:
return {"min": None, "max": None, "distinct": 0}
return {
"min": row["min_d"] if row else None,
"max": row["max_d"] if row else None,
@ -426,8 +820,11 @@ def _existing_suspend_dates(start_str: str, end_str: str, ts_code: str | None =
if ts_code:
sql += " AND ts_code = ?"
params.append(ts_code)
try:
with db_session(read_only=True) as conn:
rows = conn.execute(sql, tuple(params)).fetchall()
except sqlite3.OperationalError:
return set()
return {row["suspend_date"] for row in rows if row["suspend_date"]}
@ -477,6 +874,7 @@ def fetch_stock_basic(exchange: Optional[str] = None, list_status: str = "L") ->
list_status,
extra=LOG_EXTRA,
)
_respect_rate_limit("stock_basic", get_config())
fields = "ts_code,symbol,name,area,industry,market,exchange,list_status,list_date,delist_date"
df = client.stock_basic(exchange=exchange, list_status=list_status, fields=fields)
return _df_to_records(df, _TABLE_COLUMNS["stock_basic"])
@ -715,6 +1113,7 @@ def fetch_trade_calendar(start: date, end: date, exchange: str = "SSE") -> Itera
end_date,
extra=LOG_EXTRA,
)
_respect_rate_limit("trade_cal", get_config())
df = client.trade_cal(exchange=exchange, start_date=start_date, end_date=end_date)
if df is not None and not df.empty and "is_open" in df.columns:
df["is_open"] = pd.to_numeric(df["is_open"], errors="coerce").fillna(0).astype(int)
@ -757,6 +1156,144 @@ def fetch_stk_limit(
return _df_to_records(merged, _TABLE_COLUMNS["stk_limit"])
def fetch_index_basic(market: Optional[str] = None) -> Iterable[Dict]:
client = _ensure_client()
LOGGER.info("拉取指数基础信息market=%s", market or "all", extra=LOG_EXTRA)
_respect_rate_limit("index_basic", get_config())
df = client.index_basic(market=market)
return _df_to_records(df, _TABLE_COLUMNS["index_basic"])
def fetch_index_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]:
client = _ensure_client()
start_str = _format_date(start)
end_str = _format_date(end)
LOGGER.info(
"拉取指数日线:%s %s-%s",
ts_code,
start_str,
end_str,
extra=LOG_EXTRA,
)
df = _fetch_paginated(
"index_daily",
{"ts_code": ts_code, "start_date": start_str, "end_date": end_str},
limit=5000,
)
return _df_to_records(df, _TABLE_COLUMNS["index_daily"])
def fetch_fund_basic(asset_class: str = "E", status: str = "L") -> Iterable[Dict]:
client = _ensure_client()
LOGGER.info("拉取基金基础信息asset_class=%s status=%s", asset_class, status, extra=LOG_EXTRA)
_respect_rate_limit("fund_basic", get_config())
df = client.fund_basic(market=asset_class, status=status)
return _df_to_records(df, _TABLE_COLUMNS["fund_basic"])
def fetch_fund_nav(start: date, end: date, ts_code: str) -> Iterable[Dict]:
client = _ensure_client()
start_str = _format_date(start)
end_str = _format_date(end)
LOGGER.info(
"拉取基金净值:%s %s-%s",
ts_code,
start_str,
end_str,
extra=LOG_EXTRA,
)
df = _fetch_paginated(
"fund_nav",
{"ts_code": ts_code, "start_date": start_str, "end_date": end_str},
limit=5000,
)
return _df_to_records(df, _TABLE_COLUMNS["fund_nav"])
def fetch_fut_basic(exchange: Optional[str] = None) -> Iterable[Dict]:
client = _ensure_client()
LOGGER.info("拉取期货基础信息exchange=%s", exchange or "all", extra=LOG_EXTRA)
_respect_rate_limit("fut_basic", get_config())
df = client.fut_basic(exchange=exchange)
return _df_to_records(df, _TABLE_COLUMNS["fut_basic"])
def fetch_fut_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]:
client = _ensure_client()
start_str = _format_date(start)
end_str = _format_date(end)
LOGGER.info(
"拉取期货日线:%s %s-%s",
ts_code,
start_str,
end_str,
extra=LOG_EXTRA,
)
df = _fetch_paginated(
"fut_daily",
{"ts_code": ts_code, "start_date": start_str, "end_date": end_str},
limit=4000,
)
return _df_to_records(df, _TABLE_COLUMNS["fut_daily"])
def fetch_fx_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]:
client = _ensure_client()
start_str = _format_date(start)
end_str = _format_date(end)
LOGGER.info(
"拉取外汇日线:%s %s-%s",
ts_code,
start_str,
end_str,
extra=LOG_EXTRA,
)
df = _fetch_paginated(
"fx_daily",
{"ts_code": ts_code, "start_date": start_str, "end_date": end_str},
limit=4000,
)
return _df_to_records(df, _TABLE_COLUMNS["fx_daily"])
def fetch_hk_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]:
client = _ensure_client()
start_str = _format_date(start)
end_str = _format_date(end)
LOGGER.info(
"拉取港股日线:%s %s-%s",
ts_code,
start_str,
end_str,
extra=LOG_EXTRA,
)
df = _fetch_paginated(
"hk_daily",
{"ts_code": ts_code, "start_date": start_str, "end_date": end_str},
limit=4000,
)
return _df_to_records(df, _TABLE_COLUMNS["hk_daily"])
def fetch_us_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]:
client = _ensure_client()
start_str = _format_date(start)
end_str = _format_date(end)
LOGGER.info(
"拉取美股日线:%s %s-%s",
ts_code,
start_str,
end_str,
extra=LOG_EXTRA,
)
df = _fetch_paginated(
"us_daily",
{"ts_code": ts_code, "start_date": start_str, "end_date": end_str},
limit=4000,
)
return _df_to_records(df, _TABLE_COLUMNS["us_daily"])
def save_records(table: str, rows: Iterable[Dict]) -> None:
items = list(rows)
if not items:
@ -812,6 +1349,7 @@ def ensure_data_coverage(
end: date,
ts_codes: Optional[Sequence[str]] = None,
include_limits: bool = True,
include_extended: bool = True,
force: bool = False,
progress_hook: Callable[[str, float], None] | None = None,
) -> None:
@ -819,7 +1357,12 @@ def ensure_data_coverage(
start_str = _format_date(start)
end_str = _format_date(end)
total_steps = 5 + (1 if include_limits else 0)
extra_steps = 0
if include_limits:
extra_steps += 1
if include_extended:
extra_steps += 4
total_steps = 5 + extra_steps
current_step = 0
def advance(message: str) -> None:
@ -864,6 +1407,16 @@ def ensure_data_coverage(
"stk_limit": "trade_date",
"suspend": "suspend_date",
}
date_cols.update(
{
"index_daily": "trade_date",
"fund_nav": "nav_date",
"fut_daily": "trade_date",
"fx_daily": "trade_date",
"hk_daily": "trade_date",
"us_daily": "trade_date",
}
)
def _save_with_codes(table: str, fetch_fn) -> None:
date_col = date_cols.get(table, "trade_date")
@ -914,6 +1467,79 @@ def ensure_data_coverage(
advance("处理停复牌信息")
_save_with_codes("suspend", fetch_suspensions)
if include_extended:
advance("同步指数/基金/期货基础信息")
try:
save_records("index_basic", fetch_index_basic())
save_records("fund_basic", fetch_fund_basic())
save_records("fut_basic", fetch_fut_basic())
except Exception:
LOGGER.exception("扩展基础信息拉取失败")
raise
advance("拉取指数行情数据")
for code in INDEX_CODES:
try:
if not force and _should_skip_range("index_daily", "trade_date", start, end, code):
LOGGER.info("指数 %s 已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("index_daily", fetch_index_daily(start, end, code))
except Exception:
LOGGER.exception("指数行情拉取失败:%s", code)
raise
advance("拉取基金净值数据")
fund_targets = tuple(dict.fromkeys(ETF_CODES + FUND_CODES))
for code in fund_targets:
try:
if not force and _should_skip_range("fund_nav", "nav_date", start, end, code):
LOGGER.info("基金 %s 净值已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("fund_nav", fetch_fund_nav(start, end, code))
except Exception:
LOGGER.exception("基金净值拉取失败:%s", code)
raise
advance("拉取期货/外汇行情数据")
for code in FUTURE_CODES:
try:
if not force and _should_skip_range("fut_daily", "trade_date", start, end, code):
LOGGER.info("期货 %s 已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("fut_daily", fetch_fut_daily(start, end, code))
except Exception:
LOGGER.exception("期货行情拉取失败:%s", code)
raise
for code in FX_CODES:
try:
if not force and _should_skip_range("fx_daily", "trade_date", start, end, code):
LOGGER.info("外汇 %s 已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("fx_daily", fetch_fx_daily(start, end, code))
except Exception:
LOGGER.exception("外汇行情拉取失败:%s", code)
raise
advance("拉取港/美股行情数据")
for code in HK_CODES:
try:
if not force and _should_skip_range("hk_daily", "trade_date", start, end, code):
LOGGER.info("港股 %s 已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("hk_daily", fetch_hk_daily(start, end, code))
except Exception:
LOGGER.exception("港股行情拉取失败:%s", code)
raise
for code in US_CODES:
try:
if not force and _should_skip_range("us_daily", "trade_date", start, end, code):
LOGGER.info("美股 %s 已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("us_daily", fetch_us_daily(start, end, code))
except Exception:
LOGGER.exception("美股行情拉取失败:%s", code)
raise
if progress_hook:
progress_hook("数据覆盖检查完成", 1.0)
@ -951,6 +1577,12 @@ def collect_data_coverage(start: date, end: date) -> Dict[str, Dict[str, object]
add_table("adj_factor", "trade_date")
add_table("stk_limit", "trade_date")
add_table("suspend", "suspend_date", require_days=False)
add_table("index_daily", "trade_date")
add_table("fund_nav", "nav_date", require_days=False)
add_table("fut_daily", "trade_date", require_days=False)
add_table("fx_daily", "trade_date", require_days=False)
add_table("hk_daily", "trade_date", require_days=False)
add_table("us_daily", "trade_date", require_days=False)
with db_session(read_only=True) as conn:
stock_tot = conn.execute("SELECT COUNT(*) AS cnt FROM stock_basic").fetchone()
@ -976,6 +1608,7 @@ def run_ingestion(job: FetchJob, include_limits: bool = True) -> None:
job.end,
ts_codes=job.ts_codes,
include_limits=include_limits,
include_extended=True,
force=True,
)
LOGGER.info("任务 %s 完成", job.name, extra=LOG_EXTRA)