From c2fc3edaf0d28c0f314612b366b2a28b2c726e32 Mon Sep 17 00:00:00 2001 From: sam Date: Sat, 27 Sep 2025 16:12:05 +0800 Subject: [PATCH] update --- app/ingest/tushare.py | 22 ++++++++++++++++++++++ app/utils/config.py | 1 + app/utils/logging.py | 18 +++++++++++++++++- 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/app/ingest/tushare.py b/app/ingest/tushare.py index 974ae2b..8d74b2d 100644 --- a/app/ingest/tushare.py +++ b/app/ingest/tushare.py @@ -2,6 +2,8 @@ from __future__ import annotations import os +import time +from collections import deque from dataclasses import dataclass from datetime import date from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple @@ -24,6 +26,23 @@ LOGGER = get_logger(__name__) API_DEFAULT_LIMIT = 5000 LOG_EXTRA = {"stage": "data_ingest"} +_CALL_QUEUE = deque() + + +def _respect_rate_limit(cfg) -> None: + max_calls = cfg.max_calls_per_minute + if max_calls <= 0: + return + now = time.time() + window = 60.0 + while _CALL_QUEUE and now - _CALL_QUEUE[0] > window: + _CALL_QUEUE.popleft() + if len(_CALL_QUEUE) >= max_calls: + sleep_time = window - (now - _CALL_QUEUE[0]) + 0.1 + LOGGER.debug("触发限频控制,休眠 %.2f 秒", sleep_time, extra=LOG_EXTRA) + time.sleep(max(0.1, sleep_time)) + _CALL_QUEUE.append(time.time()) + def _existing_date_range( table: str, @@ -63,6 +82,7 @@ def _fetch_paginated(endpoint: str, params: Dict[str, object], limit: int | None extra=LOG_EXTRA, ) while True: + _respect_rate_limit(get_config()) call = getattr(client, endpoint) try: df = call(limit=limit, offset=offset, **clean_params) @@ -608,6 +628,8 @@ def fetch_trade_calendar(start: date, end: date, exchange: str = "SSE") -> Itera end_date = _format_date(end) LOGGER.info("拉取交易日历(交易所:%s,区间:%s-%s)", exchange, start_date, end_date) df = client.trade_cal(exchange=exchange, start_date=start_date, end_date=end_date) + if df is not None and not df.empty and "is_open" in df.columns: + df["is_open"] = pd.to_numeric(df["is_open"], errors="coerce").fillna(0).astype(int) return _df_to_records(df, _TABLE_COLUMNS["trade_calendar"]) diff --git a/app/utils/config.py b/app/utils/config.py index 007517a..5a304af 100644 --- a/app/utils/config.py +++ b/app/utils/config.py @@ -55,6 +55,7 @@ class AppConfig: data_paths: DataPaths = field(default_factory=DataPaths) agent_weights: AgentWeights = field(default_factory=AgentWeights) force_refresh: bool = False + max_calls_per_minute: int = 180 CONFIG = AppConfig() diff --git a/app/utils/logging.py b/app/utils/logging.py index 783697b..9f06d51 100644 --- a/app/utils/logging.py +++ b/app/utils/logging.py @@ -73,9 +73,10 @@ def setup_logging( level = logging.DEBUG cfg = get_config() + timestamp = datetime.now().strftime("%Y%m%d_%H%M") log_dir: Path = cfg.data_paths.root / "logs" log_dir.mkdir(parents=True, exist_ok=True) - logfile = log_dir / "app.log" + logfile = log_dir / f"app_{timestamp}.log" root = logging.getLogger() root.setLevel(level) @@ -98,6 +99,21 @@ def setup_logging( root.addHandler(db_handler) _IS_CONFIGURED = True + + cfg = get_config() + root.info( + "日志系统初始化完成", + extra={ + "stage": "config", + "config": { + "tushare_token_set": bool(cfg.tushare_token), + "force_refresh": cfg.force_refresh, + "data_root": str(cfg.data_paths.root), + "logfile": str(logfile), + "max_calls_per_minute": cfg.max_calls_per_minute, + }, + }, + ) return root