add schema columns with best-effort migration and date column detection

This commit is contained in:
Your Name 2025-10-08 10:41:26 +08:00
parent 264a0367b6
commit 09ff0080f4
3 changed files with 157 additions and 12 deletions

View File

@ -77,10 +77,20 @@ SCHEMA_STATEMENTS: Iterable[str] = (
CREATE TABLE IF NOT EXISTS factors ( CREATE TABLE IF NOT EXISTS factors (
ts_code TEXT, ts_code TEXT,
trade_date TEXT, trade_date TEXT,
mom_5 REAL,
mom_20 REAL, mom_20 REAL,
mom_60 REAL, mom_60 REAL,
volat_20 REAL, volat_20 REAL,
turn_5 REAL,
turn_20 REAL, turn_20 REAL,
risk_penalty REAL,
sent_divergence REAL,
sent_market REAL,
sent_momentum REAL,
val_multiscore REAL,
val_pe_score REAL,
val_pb_score REAL,
volume_ratio_score REAL,
updated_at TEXT, updated_at TEXT,
PRIMARY KEY (ts_code, trade_date) PRIMARY KEY (ts_code, trade_date)
); );
@ -97,6 +107,7 @@ SCHEMA_STATEMENTS: Iterable[str] = (
CREATE TABLE IF NOT EXISTS suspend ( CREATE TABLE IF NOT EXISTS suspend (
ts_code TEXT, ts_code TEXT,
suspend_date TEXT, suspend_date TEXT,
trade_date TEXT,
resume_date TEXT, resume_date TEXT,
suspend_type TEXT, suspend_type TEXT,
ann_date TEXT, ann_date TEXT,
@ -141,6 +152,17 @@ SCHEMA_STATEMENTS: Iterable[str] = (
exp_date TEXT exp_date TEXT
); );
""", """,
# note: no physical `index` table is created here; derived fields
# such as `index.performance_peers` are produced at runtime.
"""
CREATE TABLE IF NOT EXISTS macro (
ts_code TEXT,
trade_date TEXT,
industry_heat REAL,
relative_strength REAL,
PRIMARY KEY (ts_code, trade_date)
);
""",
""" """
CREATE TABLE IF NOT EXISTS index_daily ( CREATE TABLE IF NOT EXISTS index_daily (
ts_code TEXT, ts_code TEXT,
@ -296,7 +318,9 @@ SCHEMA_STATEMENTS: Iterable[str] = (
url TEXT, url TEXT,
entities TEXT, entities TEXT,
sentiment REAL, sentiment REAL,
heat REAL heat REAL,
sentiment_index REAL,
heat_score REAL
); );
""", """,
""" """
@ -568,6 +592,78 @@ def initialize_database() -> MigrationResult:
session.commit() session.commit()
# Ensure missing columns are added to existing tables when possible.
try:
_ensure_columns()
except Exception as e: # noqa: BLE001
# Non-fatal: log and continue; runtime code already derives many fields.
print(f"列迁移失败: {e}")
# 返回执行摘要(创建后再次检查缺失表以报告) # 返回执行摘要(创建后再次检查缺失表以报告)
remaining = _missing_tables() remaining = _missing_tables()
return MigrationResult(executed=executed, skipped=False, missing_tables=remaining) return MigrationResult(executed=executed, skipped=False, missing_tables=remaining)
def _ensure_columns() -> None:
"""Attempt to add known missing columns to existing tables.
This helper is conservative: it queries PRAGMA table_info and issues
ALTER TABLE ... ADD COLUMN only for columns that don't exist. It
ignores failures so initialization is non-blocking on older DB files.
"""
try:
with db_session() as conn:
cursor = conn.cursor()
def table_columns(name: str) -> set:
try:
rows = conn.execute(f"PRAGMA table_info({name})").fetchall()
except Exception:
return set()
return {row[1] if isinstance(row, tuple) else row["name"] for row in rows}
desired_columns = {
"factors": {
"mom_5": "REAL",
"mom_20": "REAL",
"mom_60": "REAL",
"volat_20": "REAL",
"turn_5": "REAL",
"turn_20": "REAL",
"risk_penalty": "REAL",
"sent_divergence": "REAL",
"sent_market": "REAL",
"sent_momentum": "REAL",
"val_multiscore": "REAL",
"val_pe_score": "REAL",
"val_pb_score": "REAL",
"volume_ratio_score": "REAL",
"updated_at": "TEXT",
},
"news": {
"sentiment_index": "REAL",
"heat_score": "REAL",
},
"macro": {
"industry_heat": "REAL",
"relative_strength": "REAL",
},
}
for table, cols in desired_columns.items():
existing = table_columns(table)
if not existing:
# table may not exist; skip
continue
for col, coltype in cols.items():
if col in existing:
continue
try:
cursor.execute(f"ALTER TABLE {table} ADD COLUMN {col} {coltype}")
except Exception:
# best-effort: ignore failures (e.g., invalid table names)
continue
conn.commit()
except Exception:
# swallow to avoid failing initialization
return

View File

@ -765,6 +765,18 @@ class DataBroker:
table, column = normalized table, column = normalized
resolved = self._resolve_column(table, column) resolved = self._resolve_column(table, column)
if not resolved: if not resolved:
# Certain fields are derived at runtime and intentionally
# do not require physical columns. Suppress noisy debug logs
# for those known derived fields so startup isn't spammy.
derived_fields = {
"macro.industry_heat",
"macro.relative_strength",
"index.performance_peers",
"news.heat_score",
"news.sentiment_index",
}
if f"{table}.{column}" in derived_fields:
return None
LOGGER.debug( LOGGER.debug(
"字段不存在 table=%s column=%s", "字段不存在 table=%s column=%s",
table, table,

View File

@ -10,6 +10,32 @@ class BrokerQueryEngine:
"""Lightweight wrapper around standard query patterns.""" """Lightweight wrapper around standard query patterns."""
session_factory: Callable[..., object] session_factory: Callable[..., object]
_date_cache: dict = None
def _find_date_column(self, conn, table: str) -> str | None:
"""Return the best date column for the table or None if none found."""
if self._date_cache is None:
self._date_cache = {}
if table in self._date_cache:
return self._date_cache[table]
try:
rows = conn.execute(f"PRAGMA table_info({table})").fetchall()
except Exception:
self._date_cache[table] = None
return None
cols = [row[1] if isinstance(row, tuple) else row["name"] for row in rows]
# Prefer canonical 'trade_date'
if "trade_date" in cols:
self._date_cache[table] = "trade_date"
return "trade_date"
# Prefer any column that ends with '_date'
for c in cols:
if isinstance(c, str) and c.endswith("_date"):
self._date_cache[table] = c
return c
# No date-like column
self._date_cache[table] = None
return None
def fetch_latest( def fetch_latest(
self, self,
@ -21,12 +47,17 @@ class BrokerQueryEngine:
if not columns: if not columns:
return None return None
joined_cols = ", ".join(columns) joined_cols = ", ".join(columns)
query = (
f"SELECT trade_date, {joined_cols} FROM {table} "
"WHERE ts_code = ? AND trade_date <= ? "
"ORDER BY trade_date DESC LIMIT 1"
)
with self.session_factory(read_only=True) as conn: with self.session_factory(read_only=True) as conn:
date_col = self._find_date_column(conn, table)
if table == "suspend" or date_col is None:
# For suspend table we prefer to query by ts_code only
query = f"SELECT {joined_cols} FROM {table} WHERE ts_code = ? ORDER BY rowid DESC LIMIT 1"
return conn.execute(query, (ts_code,)).fetchone()
query = (
f"SELECT {date_col}, {joined_cols} FROM {table} "
f"WHERE ts_code = ? AND {date_col} <= ? "
f"ORDER BY {date_col} DESC LIMIT 1"
)
return conn.execute(query, (ts_code, trade_date)).fetchone() return conn.execute(query, (ts_code, trade_date)).fetchone()
def fetch_series( def fetch_series(
@ -37,13 +68,19 @@ class BrokerQueryEngine:
end_date: str, end_date: str,
limit: int, limit: int,
) -> List[Mapping[str, object]]: ) -> List[Mapping[str, object]]:
query = (
f"SELECT trade_date, {column} FROM {table} "
"WHERE ts_code = ? AND trade_date <= ? "
"ORDER BY trade_date DESC LIMIT ?"
)
with self.session_factory(read_only=True) as conn: with self.session_factory(read_only=True) as conn:
rows = conn.execute(query, (ts_code, end_date, limit)).fetchall() date_col = self._find_date_column(conn, table)
if date_col is None:
# No date column: return most recent rows by rowid
query = f"SELECT rowid AS trade_date, {column} FROM {table} WHERE ts_code = ? ORDER BY rowid DESC LIMIT ?"
rows = conn.execute(query, (ts_code, limit)).fetchall()
else:
query = (
f"SELECT {date_col} AS trade_date, {column} FROM {table} "
f"WHERE ts_code = ? AND {date_col} <= ? "
f"ORDER BY {date_col} DESC LIMIT ?"
)
rows = conn.execute(query, (ts_code, end_date, limit)).fetchall()
return list(rows) return list(rows)
def fetch_table( def fetch_table(