From d6292e2b2f454ac78fba7a90a505eaa38f24e117 Mon Sep 17 00:00:00 2001 From: sam Date: Sat, 18 Oct 2025 19:19:13 +0800 Subject: [PATCH] add disabled tables configuration for data ingestion control --- app/ingest/coverage.py | 13 +++++++++++++ app/utils/config.py | 22 +++++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/app/ingest/coverage.py b/app/ingest/coverage.py index 33e439a..cdbb983 100644 --- a/app/ingest/coverage.py +++ b/app/ingest/coverage.py @@ -8,6 +8,7 @@ from typing import Callable, Dict, Iterable, List, Optional, Sequence from app.data.schema import initialize_database from app.utils.db import db_session +from app.utils.config import get_config from app.utils.logging import get_logger from .api_client import ( @@ -177,6 +178,15 @@ def ensure_data_coverage( initialize_database() start_str = _format_date(start) end_str = _format_date(end) + cfg = get_config() + disabled_tables = { + name.strip().lower() + for name in getattr(cfg, "disabled_ingest_tables", set()) + if isinstance(name, str) and name.strip() + } + + def _is_disabled(table: str) -> bool: + return table.lower() in disabled_tables extra_steps = 0 if include_limits: @@ -256,6 +266,9 @@ def ensure_data_coverage( *, targets: Optional[Iterable[str]] = None, ) -> None: + if _is_disabled(table): + LOGGER.info("表 %s 已在禁用列表,跳过拉取", table, extra=LOG_EXTRA) + return date_col = date_cols.get(table, "trade_date") incremental = table in incremental_tables sig = signature(fetch_fn) diff --git a/app/utils/config.py b/app/utils/config.py index 293e234..b567a91 100644 --- a/app/utils/config.py +++ b/app/utils/config.py @@ -6,7 +6,7 @@ import json import logging import os from pathlib import Path -from typing import Dict, Iterable, List, Mapping, Optional +from typing import Dict, Iterable, List, Mapping, Optional, Set LOGGER = logging.getLogger(__name__) @@ -567,6 +567,7 @@ class AppConfig: departments: Dict[str, DepartmentSettings] = field(default_factory=_default_departments) portfolio: PortfolioSettings = field(default_factory=PortfolioSettings) alert_channels: Dict[str, AlertChannelSettings] = field(default_factory=dict) + disabled_ingest_tables: Set[str] = field(default_factory=set) def resolve_llm(self, route: Optional[str] = None) -> LLMConfig: return self.llm @@ -626,6 +627,16 @@ def _load_from_file(cfg: AppConfig) -> None: cfg.force_refresh = bool(payload.get("force_refresh")) if "auto_update_data" in payload: cfg.auto_update_data = bool(payload.get("auto_update_data")) + disabled_tables_raw = payload.get("disabled_ingest_tables") + if isinstance(disabled_tables_raw, list): + cfg.disabled_ingest_tables = { + str(item).strip().lower() + for item in disabled_tables_raw + if isinstance(item, str) and item.strip() + } + elif isinstance(disabled_tables_raw, str): + normalized = disabled_tables_raw.strip().lower() + cfg.disabled_ingest_tables = {normalized} if normalized else set() log_level_raw = payload.get("log_level") if isinstance(log_level_raw, str) and log_level_raw.strip(): cfg.log_level = log_level_raw.strip() @@ -942,6 +953,7 @@ def save_config(cfg: AppConfig | None = None) -> None: "force_refresh": cfg.force_refresh, "auto_update_data": cfg.auto_update_data, "decision_method": cfg.decision_method, + "disabled_ingest_tables": sorted(cfg.disabled_ingest_tables), "rss_sources": cfg.rss_sources, "agent_weights": cfg.agent_weights.as_dict(), "portfolio": { @@ -1055,6 +1067,14 @@ def _load_env_defaults(cfg: AppConfig) -> None: channel.tags = [tag.strip() for tag in tags_raw.split(",") if tag.strip()] cfg.alert_channels[key] = channel + disabled_tables_env = os.getenv("LLM_QUANT_DISABLED_TABLES") + if disabled_tables_env: + cfg.disabled_ingest_tables = { + part.strip().lower() + for part in disabled_tables_env.split(",") + if part.strip() + } + cfg.sync_runtime_llm()