From ef15433fc865ac09712a0e73f1a311e6b1e774af Mon Sep 17 00:00:00 2001 From: sam Date: Wed, 8 Oct 2025 13:38:00 +0800 Subject: [PATCH] add index weight schema and data ingestion for index constituent weights --- app/ingest/tushare.py | 80 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/app/ingest/tushare.py b/app/ingest/tushare.py index 98551a0..56e8e04 100644 --- a/app/ingest/tushare.py +++ b/app/ingest/tushare.py @@ -365,6 +365,17 @@ _TABLE_SCHEMAS: Dict[str, str] = { PRIMARY KEY (ts_code, trade_date) ); """, + "index_weight": """ + CREATE TABLE IF NOT EXISTS index_weight ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + index_code VARCHAR(10) NOT NULL, + trade_date VARCHAR(8) NOT NULL, + ts_code VARCHAR(10) NOT NULL, + weight FLOAT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + INDEX idx_index_weight_lookup (index_code, trade_date) + ); + """, "fund_basic": """ CREATE TABLE IF NOT EXISTS fund_basic ( ts_code TEXT PRIMARY KEY, @@ -596,6 +607,12 @@ _TABLE_COLUMNS: Dict[str, List[str]] = { "vol", "amount", ], + "index_weight": [ + "index_code", + "trade_date", + "ts_code", + "weight", + ], "fund_basic": [ "ts_code", "name", @@ -1204,6 +1221,35 @@ def fetch_index_daily(start: date, end: date, ts_code: str) -> Iterable[Dict]: return _df_to_records(df, _TABLE_COLUMNS["index_daily"]) +def fetch_index_weight(start: date, end: date, index_code: str) -> Iterable[Dict]: + """拉取指定指数的成分股权重数据。 + + Args: + start: 开始日期 + end: 结束日期 + index_code: 指数代码,如 "000300.SH" + + Returns: + 成分股权重数据列表 + """ + client = _ensure_client() + start_str = _format_date(start) + end_str = _format_date(end) + LOGGER.info( + "拉取指数成分股权重:%s %s-%s", + index_code, + start_str, + end_str, + extra=LOG_EXTRA, + ) + df = _fetch_paginated( + "index_weight", + {"index_code": index_code, "start_date": start_str, "end_date": end_str}, + limit=5000, + ) + return _df_to_records(df, ["index_code", "trade_date", "ts_code", "weight"]) + + def fetch_fund_basic(asset_class: str = "E", status: str = "L") -> Iterable[Dict]: client = _ensure_client() LOGGER.info("拉取基金基础信息:asset_class=%s status=%s", asset_class, status, extra=LOG_EXTRA) @@ -1365,6 +1411,29 @@ def ensure_trade_calendar(start: date, end: date, exchanges: Sequence[str] = ("S save_records("trade_calendar", fetch_trade_calendar(start, end, exchange=exch)) +def ensure_index_weights(start: date, end: date, index_codes: Optional[Sequence[str]] = None) -> None: + """确保指定指数的成分股权重数据完整。 + + Args: + start: 开始日期 + end: 结束日期 + index_codes: 指数代码列表,如果为 None 则使用默认的 A 股指数 + """ + if index_codes is None: + # 默认获取 A 股指数的成分股权重 + index_codes = [code for code in INDEX_CODES if code.endswith(".SH") or code.endswith(".SZ")] + + for index_code in index_codes: + start_str = _format_date(start) + end_str = _format_date(end) + + if _range_needs_refresh("index_weight", "trade_date", start_str, end_str, index_code=index_code): + LOGGER.info("指数 %s 的成分股权重数据不完整,开始拉取 %s-%s", index_code, start_str, end_str) + save_records("index_weight", fetch_index_weight(start, end, index_code)) + else: + LOGGER.info("指数 %s 的成分股权重数据已完整,跳过", index_code) + + def ensure_data_coverage( start: date, end: date, @@ -1421,6 +1490,16 @@ def ensure_data_coverage( job = FetchJob("daily_autofill", start=start, end=end) LOGGER.info("开始拉取日线行情:%s-%s", start_str, end_str) save_records("daily", fetch_daily_bars(job, skip_existing=not force)) + + advance("处理指数成分股权重数据") + # 获取默认指数列表 + default_indices = [code for code in INDEX_CODES if code.endswith(".SH") or code.endswith(".SZ")] + for index_code in default_indices: + if not force and _should_skip_range("index_weight", "trade_date", start, end, index_code): + LOGGER.info("指数 %s 的成分股权重已覆盖 %s-%s,跳过", index_code, start_str, end_str) + continue + LOGGER.info("开始拉取指数成分股权重:%s %s-%s", index_code, start_str, end_str) + save_records("index_weight", fetch_index_weight(start, end, index_code)) date_cols = { "daily_basic": "trade_date", @@ -1431,6 +1510,7 @@ def ensure_data_coverage( date_cols.update( { "index_daily": "trade_date", + "index_weight": "trade_date", "fund_nav": "nav_date", "fut_daily": "trade_date", "fx_daily": "trade_date",