From 09ff0080f400af161269da90b1ae279a42b25176 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 8 Oct 2025 10:41:26 +0800 Subject: [PATCH] add schema columns with best-effort migration and date column detection --- app/data/schema.py | 98 +++++++++++++++++++++++++++++++++++++++- app/utils/data_access.py | 12 +++++ app/utils/db_query.py | 59 +++++++++++++++++++----- 3 files changed, 157 insertions(+), 12 deletions(-) diff --git a/app/data/schema.py b/app/data/schema.py index fe3b5f0..f011db6 100644 --- a/app/data/schema.py +++ b/app/data/schema.py @@ -77,10 +77,20 @@ SCHEMA_STATEMENTS: Iterable[str] = ( CREATE TABLE IF NOT EXISTS factors ( ts_code TEXT, trade_date TEXT, + mom_5 REAL, mom_20 REAL, mom_60 REAL, volat_20 REAL, + turn_5 REAL, turn_20 REAL, + risk_penalty REAL, + sent_divergence REAL, + sent_market REAL, + sent_momentum REAL, + val_multiscore REAL, + val_pe_score REAL, + val_pb_score REAL, + volume_ratio_score REAL, updated_at TEXT, PRIMARY KEY (ts_code, trade_date) ); @@ -97,6 +107,7 @@ SCHEMA_STATEMENTS: Iterable[str] = ( CREATE TABLE IF NOT EXISTS suspend ( ts_code TEXT, suspend_date TEXT, + trade_date TEXT, resume_date TEXT, suspend_type TEXT, ann_date TEXT, @@ -141,6 +152,17 @@ SCHEMA_STATEMENTS: Iterable[str] = ( exp_date TEXT ); """, + # note: no physical `index` table is created here; derived fields + # such as `index.performance_peers` are produced at runtime. + """ + CREATE TABLE IF NOT EXISTS macro ( + ts_code TEXT, + trade_date TEXT, + industry_heat REAL, + relative_strength REAL, + PRIMARY KEY (ts_code, trade_date) + ); + """, """ CREATE TABLE IF NOT EXISTS index_daily ( ts_code TEXT, @@ -296,7 +318,9 @@ SCHEMA_STATEMENTS: Iterable[str] = ( url TEXT, entities TEXT, sentiment REAL, - heat REAL + heat REAL, + sentiment_index REAL, + heat_score REAL ); """, """ @@ -568,6 +592,78 @@ def initialize_database() -> MigrationResult: session.commit() + # Ensure missing columns are added to existing tables when possible. + try: + _ensure_columns() + except Exception as e: # noqa: BLE001 + # Non-fatal: log and continue; runtime code already derives many fields. + print(f"列迁移失败: {e}") + # 返回执行摘要(创建后再次检查缺失表以报告) remaining = _missing_tables() return MigrationResult(executed=executed, skipped=False, missing_tables=remaining) + + +def _ensure_columns() -> None: + """Attempt to add known missing columns to existing tables. + + This helper is conservative: it queries PRAGMA table_info and issues + ALTER TABLE ... ADD COLUMN only for columns that don't exist. It + ignores failures so initialization is non-blocking on older DB files. + """ + try: + with db_session() as conn: + cursor = conn.cursor() + + def table_columns(name: str) -> set: + try: + rows = conn.execute(f"PRAGMA table_info({name})").fetchall() + except Exception: + return set() + return {row[1] if isinstance(row, tuple) else row["name"] for row in rows} + + desired_columns = { + "factors": { + "mom_5": "REAL", + "mom_20": "REAL", + "mom_60": "REAL", + "volat_20": "REAL", + "turn_5": "REAL", + "turn_20": "REAL", + "risk_penalty": "REAL", + "sent_divergence": "REAL", + "sent_market": "REAL", + "sent_momentum": "REAL", + "val_multiscore": "REAL", + "val_pe_score": "REAL", + "val_pb_score": "REAL", + "volume_ratio_score": "REAL", + "updated_at": "TEXT", + }, + "news": { + "sentiment_index": "REAL", + "heat_score": "REAL", + }, + "macro": { + "industry_heat": "REAL", + "relative_strength": "REAL", + }, + } + + for table, cols in desired_columns.items(): + existing = table_columns(table) + if not existing: + # table may not exist; skip + continue + for col, coltype in cols.items(): + if col in existing: + continue + try: + cursor.execute(f"ALTER TABLE {table} ADD COLUMN {col} {coltype}") + except Exception: + # best-effort: ignore failures (e.g., invalid table names) + continue + conn.commit() + except Exception: + # swallow to avoid failing initialization + return diff --git a/app/utils/data_access.py b/app/utils/data_access.py index ba37654..be73124 100644 --- a/app/utils/data_access.py +++ b/app/utils/data_access.py @@ -765,6 +765,18 @@ class DataBroker: table, column = normalized resolved = self._resolve_column(table, column) if not resolved: + # Certain fields are derived at runtime and intentionally + # do not require physical columns. Suppress noisy debug logs + # for those known derived fields so startup isn't spammy. + derived_fields = { + "macro.industry_heat", + "macro.relative_strength", + "index.performance_peers", + "news.heat_score", + "news.sentiment_index", + } + if f"{table}.{column}" in derived_fields: + return None LOGGER.debug( "字段不存在 table=%s column=%s", table, diff --git a/app/utils/db_query.py b/app/utils/db_query.py index 860c291..9b4ea7b 100644 --- a/app/utils/db_query.py +++ b/app/utils/db_query.py @@ -10,6 +10,32 @@ class BrokerQueryEngine: """Lightweight wrapper around standard query patterns.""" session_factory: Callable[..., object] + _date_cache: dict = None + + def _find_date_column(self, conn, table: str) -> str | None: + """Return the best date column for the table or None if none found.""" + if self._date_cache is None: + self._date_cache = {} + if table in self._date_cache: + return self._date_cache[table] + try: + rows = conn.execute(f"PRAGMA table_info({table})").fetchall() + except Exception: + self._date_cache[table] = None + return None + cols = [row[1] if isinstance(row, tuple) else row["name"] for row in rows] + # Prefer canonical 'trade_date' + if "trade_date" in cols: + self._date_cache[table] = "trade_date" + return "trade_date" + # Prefer any column that ends with '_date' + for c in cols: + if isinstance(c, str) and c.endswith("_date"): + self._date_cache[table] = c + return c + # No date-like column + self._date_cache[table] = None + return None def fetch_latest( self, @@ -21,12 +47,17 @@ class BrokerQueryEngine: if not columns: return None joined_cols = ", ".join(columns) - query = ( - f"SELECT trade_date, {joined_cols} FROM {table} " - "WHERE ts_code = ? AND trade_date <= ? " - "ORDER BY trade_date DESC LIMIT 1" - ) with self.session_factory(read_only=True) as conn: + date_col = self._find_date_column(conn, table) + if table == "suspend" or date_col is None: + # For suspend table we prefer to query by ts_code only + query = f"SELECT {joined_cols} FROM {table} WHERE ts_code = ? ORDER BY rowid DESC LIMIT 1" + return conn.execute(query, (ts_code,)).fetchone() + query = ( + f"SELECT {date_col}, {joined_cols} FROM {table} " + f"WHERE ts_code = ? AND {date_col} <= ? " + f"ORDER BY {date_col} DESC LIMIT 1" + ) return conn.execute(query, (ts_code, trade_date)).fetchone() def fetch_series( @@ -37,13 +68,19 @@ class BrokerQueryEngine: end_date: str, limit: int, ) -> List[Mapping[str, object]]: - query = ( - f"SELECT trade_date, {column} FROM {table} " - "WHERE ts_code = ? AND trade_date <= ? " - "ORDER BY trade_date DESC LIMIT ?" - ) with self.session_factory(read_only=True) as conn: - rows = conn.execute(query, (ts_code, end_date, limit)).fetchall() + date_col = self._find_date_column(conn, table) + if date_col is None: + # No date column: return most recent rows by rowid + query = f"SELECT rowid AS trade_date, {column} FROM {table} WHERE ts_code = ? ORDER BY rowid DESC LIMIT ?" + rows = conn.execute(query, (ts_code, limit)).fetchall() + else: + query = ( + f"SELECT {date_col} AS trade_date, {column} FROM {table} " + f"WHERE ts_code = ? AND {date_col} <= ? " + f"ORDER BY {date_col} DESC LIMIT ?" + ) + rows = conn.execute(query, (ts_code, end_date, limit)).fetchall() return list(rows) def fetch_table(