From 454b9579c7fbee1fa04761281a3dbebc5413364f Mon Sep 17 00:00:00 2001 From: sam Date: Wed, 8 Oct 2025 13:42:37 +0800 Subject: [PATCH] add index daily basic schema and data ingestion for index daily indicators --- app/ingest/tushare.py | 88 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/app/ingest/tushare.py b/app/ingest/tushare.py index 56e8e04..d13f042 100644 --- a/app/ingest/tushare.py +++ b/app/ingest/tushare.py @@ -365,6 +365,21 @@ _TABLE_SCHEMAS: Dict[str, str] = { PRIMARY KEY (ts_code, trade_date) ); """, + "index_dailybasic": """ + CREATE TABLE IF NOT EXISTS index_dailybasic ( + ts_code TEXT, + trade_date TEXT, + turnover REAL, + turnover_ratio REAL, + pe_ttm REAL, + pb REAL, + ps_ttm REAL, + dv_ttm REAL, + total_mv REAL, + circ_mv REAL, + PRIMARY KEY (ts_code, trade_date) + ); + """, "index_weight": """ CREATE TABLE IF NOT EXISTS index_weight ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -607,6 +622,18 @@ _TABLE_COLUMNS: Dict[str, List[str]] = { "vol", "amount", ], + "index_dailybasic": [ + "ts_code", + "trade_date", + "turnover", + "turnover_ratio", + "pe_ttm", + "pb", + "ps_ttm", + "dv_ttm", + "total_mv", + "circ_mv", + ], "index_weight": [ "index_code", "trade_date", @@ -1250,6 +1277,35 @@ def fetch_index_weight(start: date, end: date, index_code: str) -> Iterable[Dict return _df_to_records(df, ["index_code", "trade_date", "ts_code", "weight"]) +def fetch_index_dailybasic(start: date, end: date, ts_code: str) -> Iterable[Dict]: + """拉取指定指数的每日指标数据。 + + Args: + start: 开始日期 + end: 结束日期 + ts_code: 指数代码,如 "000300.SH" + + Returns: + 指数每日指标数据 + """ + client = _ensure_client() + start_str = _format_date(start) + end_str = _format_date(end) + LOGGER.info( + "拉取指数每日指标:%s %s-%s", + ts_code, + start_str, + end_str, + extra=LOG_EXTRA, + ) + df = _fetch_paginated( + "index_dailybasic", + {"ts_code": ts_code, "start_date": start_str, "end_date": end_str}, + limit=5000, + ) + return _df_to_records(df, ["ts_code", "trade_date", "turnover", "turnover_ratio", "pe_ttm", "pb", "ps_ttm", "dv_ttm", "total_mv", "circ_mv"]) + + def fetch_fund_basic(asset_class: str = "E", status: str = "L") -> Iterable[Dict]: client = _ensure_client() LOGGER.info("拉取基金基础信息:asset_class=%s status=%s", asset_class, status, extra=LOG_EXTRA) @@ -1434,6 +1490,29 @@ def ensure_index_weights(start: date, end: date, index_codes: Optional[Sequence[ LOGGER.info("指数 %s 的成分股权重数据已完整,跳过", index_code) +def ensure_index_dailybasic(start: date, end: date, index_codes: Optional[Sequence[str]] = None) -> None: + """确保指定指数的每日指标数据完整。 + + Args: + start: 开始日期 + end: 结束日期 + index_codes: 指数代码列表,如果为 None 则使用默认的 A 股指数 + """ + if index_codes is None: + # 默认获取 A 股指数的每日指标 + index_codes = [code for code in INDEX_CODES if code.endswith(".SH") or code.endswith(".SZ")] + + for index_code in index_codes: + start_str = _format_date(start) + end_str = _format_date(end) + + if _range_needs_refresh("index_dailybasic", "trade_date", start_str, end_str, ts_code=index_code): + LOGGER.info("指数 %s 的每日指标数据不完整,开始拉取 %s-%s", index_code, start_str, end_str) + save_records("index_dailybasic", fetch_index_dailybasic(start, end, index_code)) + else: + LOGGER.info("指数 %s 的每日指标数据已完整,跳过", index_code) + + def ensure_data_coverage( start: date, end: date, @@ -1500,6 +1579,14 @@ def ensure_data_coverage( continue LOGGER.info("开始拉取指数成分股权重:%s %s-%s", index_code, start_str, end_str) save_records("index_weight", fetch_index_weight(start, end, index_code)) + + advance("处理指数每日指标数据") + for index_code in default_indices: + if not force and _should_skip_range("index_dailybasic", "trade_date", start, end, index_code): + LOGGER.info("指数 %s 的每日指标已覆盖 %s-%s,跳过", index_code, start_str, end_str) + continue + LOGGER.info("开始拉取指数每日指标:%s %s-%s", index_code, start_str, end_str) + save_records("index_dailybasic", fetch_index_dailybasic(start, end, index_code)) date_cols = { "daily_basic": "trade_date", @@ -1510,6 +1597,7 @@ def ensure_data_coverage( date_cols.update( { "index_daily": "trade_date", + "index_dailybasic": "trade_date", "index_weight": "trade_date", "fund_nav": "nav_date", "fut_daily": "trade_date",