diff --git a/app/agents/scopes.py b/app/agents/scopes.py index f513424..bdda105 100644 --- a/app/agents/scopes.py +++ b/app/agents/scopes.py @@ -20,8 +20,6 @@ _GAME_SCOPE_MAP: Dict[GameStructure, Set[str]] = { "daily.high", "daily_basic.turnover_rate", "daily_basic.volume_ratio", - "factors.sent_momentum", - "factors.sent_market", }, GameStructure.BAYESIAN: { "daily.close", @@ -29,7 +27,6 @@ _GAME_SCOPE_MAP: Dict[GameStructure, Set[str]] = { "factors.mom_20", "factors.mom_60", "factors.val_multiscore", - "factors.sent_divergence", }, GameStructure.CUSTOM: { "factors.risk_penalty", diff --git a/app/backtest/engine.py b/app/backtest/engine.py index 7ded11f..2daf81e 100644 --- a/app/backtest/engine.py +++ b/app/backtest/engine.py @@ -165,9 +165,6 @@ class BacktestEngine: "factors.volume_ratio_score", "factors.val_multiscore", "factors.risk_penalty", - "factors.sent_momentum", - "factors.sent_market", - "factors.sent_divergence", } selected_structures = ( cfg.game_structures diff --git a/app/data/schema.py b/app/data/schema.py index dc1854c..958d25a 100644 --- a/app/data/schema.py +++ b/app/data/schema.py @@ -22,6 +22,12 @@ SCHEMA_STATEMENTS: Iterable[str] = ( ); """, """ + CREATE TABLE IF NOT EXISTS ingest_state ( + source TEXT PRIMARY KEY, + last_published TEXT + ); + """, + """ CREATE TABLE IF NOT EXISTS stock_basic ( ts_code TEXT PRIMARY KEY, symbol TEXT, @@ -599,6 +605,7 @@ SCHEMA_STATEMENTS: Iterable[str] = ( ) REQUIRED_TABLES = ( + "ingest_state", "stock_basic", "daily", "daily_basic", diff --git a/app/features/factors.py b/app/features/factors.py index ad0701e..21a1219 100644 --- a/app/features/factors.py +++ b/app/features/factors.py @@ -15,9 +15,7 @@ from app.utils.db import db_session from app.utils.logging import get_logger # 导入扩展因子模块 from app.features.extended_factors import ExtendedFactors -from app.features.sentiment_factors import SentimentFactors from app.features.value_risk_factors import ValueRiskFactors -from app.ingest.news import prepare_news_for_factors # 导入因子验证功能 from app.features.validation import check_data_sufficiency, check_data_sufficiency_for_zero_window, detect_outliers # 导入UI进度状态管理 @@ -100,10 +98,6 @@ DEFAULT_FACTORS: List[FactorSpec] = [ FactorSpec("market_regime", 0), # 市场状态因子 FactorSpec("trend_strength", 0), # 趋势强度因子 # 情绪因子 - FactorSpec("sent_momentum", 20), # 新闻情感动量 - FactorSpec("sent_impact", 0), # 新闻影响力 - FactorSpec("sent_market", 20), # 市场情绪指数 - FactorSpec("sent_divergence", 0), # 行业情绪背离度 # 风险和估值因子 FactorSpec("risk_penalty", 0), # 风险惩罚因子 ] @@ -162,10 +156,6 @@ def compute_factors( LOGGER.info("无可用标的生成因子 trade_date=%s", trade_date_str, extra=LOG_EXTRA) return [] - if any(spec.name.startswith("sent_") for spec in specs): - # 情绪因子需要依赖最新的新闻情绪/热度评分,先确保新闻数据落库 - prepare_news_for_factors(trade_date, lookback_days=7) - if skip_existing: # 检查所有因子名称 factor_names = [spec.name for spec in specs] @@ -921,12 +911,8 @@ def _compute_security_factors( # 检查是否为扩展因子 from app.features.extended_factors import EXTENDED_FACTORS extended_factor_names = [spec.name for spec in EXTENDED_FACTORS] - - # 检查是否为情绪因子 - sentiment_factor_names = ["sent_momentum", "sent_impact", "sent_market", "sent_divergence"] - - if spec.name in extended_factor_names or spec.name in sentiment_factor_names: - # 扩展因子和情绪因子将在后续统一计算,这里不记录日志 + if spec.name in extended_factor_names: + # 扩展因子将在后续统一计算,这里不记录日志 pass else: LOGGER.info( @@ -941,12 +927,6 @@ def _compute_security_factors( extended_factors = calculator.compute_all_factors(close_series, volume_series, ts_code, trade_date) results.update(extended_factors) - # 计算情感因子 - sentiment_calculator = SentimentFactors() - sentiment_factors = sentiment_calculator.compute_stock_factors(broker, ts_code, trade_date) - if sentiment_factors: - results.update(sentiment_factors) - # 计算风险和估值因子 value_risk_calculator = ValueRiskFactors() diff --git a/app/features/sentiment_factors.py b/app/features/sentiment_factors.py index 61ae642..310183c 100644 --- a/app/features/sentiment_factors.py +++ b/app/features/sentiment_factors.py @@ -65,107 +65,13 @@ class SentimentFactors: Returns: 因子名称到因子值的映射字典 """ - results = {} - - try: - # 获取历史新闻数据 - news_data = broker.get_news_data( - ts_code, - trade_date, - limit=30 # 保留足够历史以计算动量 - ) - - if not news_data: - LOGGER.debug( - "无新闻数据 code=%s date=%s", - ts_code, - trade_date, - extra=LOG_EXTRA - ) - return {name: None for name in self.factor_specs} - - # 提取序列数据 - sentiment_series = [row["sentiment"] for row in news_data] - heat_series = [row["heat"] for row in news_data] - entity_counts = [ - len(row["entities"].split(",")) if row["entities"] else 0 - for row in news_data - ] - - # 1. 计算新闻情感动量 - results["sent_momentum"] = news_sentiment_momentum( - sentiment_series, - window=self.factor_specs["sent_momentum"] - ) - - # 2. 计算新闻影响力 - # 使用最新一条新闻的数据 - results["sent_impact"] = news_impact_score( - sentiment=sentiment_series[0], - heat=heat_series[0], - entity_count=entity_counts[0] - ) - - # 3. 计算市场情绪指数 - # 获取成交量数据 - volume_data = broker.fetch_latest( - ts_code, - trade_date, - fields=["daily_basic.volume_ratio"] - ) - if volume_data and "daily_basic.volume_ratio" in volume_data: - volume_ratio = volume_data["daily_basic.volume_ratio"] - # 使用单个成交量比率值 - results["sent_market"] = market_sentiment_index( - sentiment_series, - heat_series, - [volume_ratio], # 转换为列表 - window=self.factor_specs["sent_market"] - ) - else: - results["sent_market"] = None - - # 4. 计算行业情绪背离度 - industry = broker._lookup_industry(ts_code) - if industry: - industry_sent = broker._derived_industry_sentiment( - industry, - trade_date - ) - if industry_sent is not None: - # 获取同行业股票的情感得分 - peer_sents = [] - for peer in broker.get_industry_stocks(industry): - if peer != ts_code: - peer_data = broker.get_news_data( - peer, - trade_date, - limit=1 - ) - if peer_data: - peer_sents.append(peer_data[0]["sentiment"]) - - results["sent_divergence"] = industry_sentiment_divergence( - industry_sent, - peer_sents - ) - else: - results["sent_divergence"] = None - else: - results["sent_divergence"] = None - - except Exception as e: - LOGGER.error( - "计算情绪因子出错 code=%s date=%s error=%s", - ts_code, - trade_date, - str(e), - exc_info=True, - extra=LOG_EXTRA - ) - return {name: None for name in self.factor_specs} - - return results + LOGGER.debug( + "新闻因子计算已禁用,返回空结果 code=%s date=%s", + ts_code, + trade_date, + extra=LOG_EXTRA, + ) + return {name: None for name in self.factor_specs} def compute_batch( self, diff --git a/app/ingest/gdelt.py b/app/ingest/gdelt.py index 3a0c063..26af6db 100644 --- a/app/ingest/gdelt.py +++ b/app/ingest/gdelt.py @@ -37,6 +37,8 @@ _LANGUAGE_CANONICAL: Dict[str, str] = { "chinese": "zh", } +_LAST_INGEST_STATS: Dict[str, int] = {"fetched": 0, "deduped": 0, "inserted": 0} + @dataclass class GdeltSourceConfig: @@ -381,6 +383,21 @@ def fetch_gdelt_articles( return items +def _update_last_published_state(items: Sequence[rss_ingest.RssItem]) -> None: + latest_by_source: Dict[str, datetime] = {} + for item in items: + metadata = item.metadata or {} + source_key = str(metadata.get("source_key", "")) + if not source_key: + continue + current = latest_by_source.get(source_key) + published = item.published + if current is None or published > current: + latest_by_source[source_key] = published + for source_key, timestamp in latest_by_source.items(): + _save_last_published(source_key, timestamp) + + def ingest_configured_gdelt( start: Optional[DateLike] = None, end: Optional[DateLike] = None, @@ -398,7 +415,6 @@ def ingest_configured_gdelt( end_dt = _ensure_datetime(end, start_of_day=False) if end else None aggregated: List[rss_ingest.RssItem] = [] - latest_by_source: Dict[str, datetime] = {} fetched = 0 for config in sources: source_start = start_dt @@ -424,26 +440,22 @@ def ingest_configured_gdelt( LOGGER.info("GDELT 来源 %s 返回 %s 条记录", config.label, len(items), extra=LOG_EXTRA) if not aggregated: + _LAST_INGEST_STATS.update({"fetched": 0, "deduped": 0, "inserted": 0}) return 0 deduped = rss_ingest.deduplicate_items(aggregated) if not deduped: LOGGER.info("GDELT 数据全部为重复项,跳过落库", extra=LOG_EXTRA) + _update_last_published_state(aggregated) + _LAST_INGEST_STATS.update({"fetched": fetched, "deduped": 0, "inserted": 0}) return 0 inserted = rss_ingest.save_news_items(deduped) if inserted: - latest_by_source.clear() - for item in deduped: - source_key = str(item.metadata.get("source_key", "") if item.metadata else "") - if not source_key: - continue - current = latest_by_source.get(source_key) - candidate = item.published - if current is None or candidate > current: - latest_by_source[source_key] = candidate - for source_key, timestamp in latest_by_source.items(): - _save_last_published(source_key, timestamp) + _update_last_published_state(deduped) + else: + _update_last_published_state(aggregated) + _LAST_INGEST_STATS.update({"fetched": fetched, "deduped": len(deduped), "inserted": inserted}) LOGGER.info( "GDELT 新闻落库完成 fetched=%s deduped=%s inserted=%s", fetched, @@ -454,9 +466,16 @@ def ingest_configured_gdelt( return inserted +def get_last_ingest_stats() -> Dict[str, int]: + """Return a copy of the most recent ingestion stats.""" + + return dict(_LAST_INGEST_STATS) + + __all__ = [ "GdeltSourceConfig", "resolve_gdelt_sources", "fetch_gdelt_articles", "ingest_configured_gdelt", + "get_last_ingest_stats", ] diff --git a/app/ui/views/tests.py b/app/ui/views/tests.py index 1224341..aceee99 100644 --- a/app/ui/views/tests.py +++ b/app/ui/views/tests.py @@ -89,6 +89,7 @@ def render_tests() -> None: ) if st.button("运行 GDELT 新闻测试"): from app.ingest.news import ingest_latest_news + from app.ingest.gdelt import get_last_ingest_stats LOGGER.info( "点击 GDELT 新闻测试按钮 days=%s force=%s", @@ -99,7 +100,15 @@ def render_tests() -> None: with st.spinner("正在抓取 GDELT 新闻..."): try: count = ingest_latest_news(days_back=news_days, force=force_news) - st.success(f"GDELT 新闻测试完成,新增 {count} 条新闻记录。") + stats = get_last_ingest_stats() + fetched = stats.get("fetched", 0) + deduped = stats.get("deduped", 0) + inserted = stats.get("inserted", count) + st.success( + f"GDELT 新闻测试完成:抓取 {fetched} 条,去重后 {deduped} 条,新增 {inserted} 条。" + ) + if inserted == 0 and fetched: + st.info("提示:所有抓取新闻已存在于数据库,本次未新增记录。") except Exception as exc: # noqa: BLE001 LOGGER.exception("GDELT 新闻测试失败", extra=LOG_EXTRA) st.error(f"GDELT 新闻测试失败:{exc}") diff --git a/tests/test_backtest_engine.py b/tests/test_backtest_engine.py index 34c4999..d224c92 100644 --- a/tests/test_backtest_engine.py +++ b/tests/test_backtest_engine.py @@ -24,8 +24,5 @@ def test_required_fields_include_precomputed_factors(isolated_db): "factors.volume_ratio_score", "factors.val_multiscore", "factors.risk_penalty", - "factors.sent_momentum", - "factors.sent_market", - "factors.sent_divergence", } assert expected_fields.issubset(required) diff --git a/tests/test_sentiment_factors.py b/tests/test_sentiment_factors.py index 49d2893..d963a8e 100644 --- a/tests/test_sentiment_factors.py +++ b/tests/test_sentiment_factors.py @@ -85,12 +85,8 @@ def test_compute_stock_factors(): "20251001" ) - assert "sent_momentum" in factors - assert "sent_impact" in factors - assert "sent_market" in factors - assert "sent_divergence" in factors - - assert factors["sent_impact"] > 0 + assert all(name in factors for name in ("sent_momentum", "sent_impact", "sent_market", "sent_divergence")) + assert all(value is None for value in factors.values()) # 测试无数据的情况 factors = calculator.compute_stock_factors( @@ -120,7 +116,7 @@ def test_compute_batch(tmp_path): ts_codes = ["000001.SZ", "000002.SZ", "600000.SH"] calculator.compute_batch(broker, ts_codes, "20251001") - # 验证数据已保存 + # 验证未写入任何情绪因子数据 from app.utils.db import db_session with db_session() as conn: rows = conn.execute( @@ -128,6 +124,4 @@ def test_compute_batch(tmp_path): ("20251001",) ).fetchall() - # 应该只有一个股票有数据 - assert len(rows) == 1 - assert rows[0]["ts_code"] == "000001.SZ" + assert rows == []