This commit is contained in:
sam 2025-09-27 16:12:05 +08:00
parent 6c9c8e3140
commit c2fc3edaf0
3 changed files with 40 additions and 1 deletions

View File

@ -2,6 +2,8 @@
from __future__ import annotations from __future__ import annotations
import os import os
import time
from collections import deque
from dataclasses import dataclass from dataclasses import dataclass
from datetime import date from datetime import date
from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple
@ -24,6 +26,23 @@ LOGGER = get_logger(__name__)
API_DEFAULT_LIMIT = 5000 API_DEFAULT_LIMIT = 5000
LOG_EXTRA = {"stage": "data_ingest"} LOG_EXTRA = {"stage": "data_ingest"}
_CALL_QUEUE = deque()
def _respect_rate_limit(cfg) -> None:
max_calls = cfg.max_calls_per_minute
if max_calls <= 0:
return
now = time.time()
window = 60.0
while _CALL_QUEUE and now - _CALL_QUEUE[0] > window:
_CALL_QUEUE.popleft()
if len(_CALL_QUEUE) >= max_calls:
sleep_time = window - (now - _CALL_QUEUE[0]) + 0.1
LOGGER.debug("触发限频控制,休眠 %.2f", sleep_time, extra=LOG_EXTRA)
time.sleep(max(0.1, sleep_time))
_CALL_QUEUE.append(time.time())
def _existing_date_range( def _existing_date_range(
table: str, table: str,
@ -63,6 +82,7 @@ def _fetch_paginated(endpoint: str, params: Dict[str, object], limit: int | None
extra=LOG_EXTRA, extra=LOG_EXTRA,
) )
while True: while True:
_respect_rate_limit(get_config())
call = getattr(client, endpoint) call = getattr(client, endpoint)
try: try:
df = call(limit=limit, offset=offset, **clean_params) df = call(limit=limit, offset=offset, **clean_params)
@ -608,6 +628,8 @@ def fetch_trade_calendar(start: date, end: date, exchange: str = "SSE") -> Itera
end_date = _format_date(end) end_date = _format_date(end)
LOGGER.info("拉取交易日历(交易所:%s,区间:%s-%s", exchange, start_date, end_date) LOGGER.info("拉取交易日历(交易所:%s,区间:%s-%s", exchange, start_date, end_date)
df = client.trade_cal(exchange=exchange, start_date=start_date, end_date=end_date) df = client.trade_cal(exchange=exchange, start_date=start_date, end_date=end_date)
if df is not None and not df.empty and "is_open" in df.columns:
df["is_open"] = pd.to_numeric(df["is_open"], errors="coerce").fillna(0).astype(int)
return _df_to_records(df, _TABLE_COLUMNS["trade_calendar"]) return _df_to_records(df, _TABLE_COLUMNS["trade_calendar"])

View File

@ -55,6 +55,7 @@ class AppConfig:
data_paths: DataPaths = field(default_factory=DataPaths) data_paths: DataPaths = field(default_factory=DataPaths)
agent_weights: AgentWeights = field(default_factory=AgentWeights) agent_weights: AgentWeights = field(default_factory=AgentWeights)
force_refresh: bool = False force_refresh: bool = False
max_calls_per_minute: int = 180
CONFIG = AppConfig() CONFIG = AppConfig()

View File

@ -73,9 +73,10 @@ def setup_logging(
level = logging.DEBUG level = logging.DEBUG
cfg = get_config() cfg = get_config()
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
log_dir: Path = cfg.data_paths.root / "logs" log_dir: Path = cfg.data_paths.root / "logs"
log_dir.mkdir(parents=True, exist_ok=True) log_dir.mkdir(parents=True, exist_ok=True)
logfile = log_dir / "app.log" logfile = log_dir / f"app_{timestamp}.log"
root = logging.getLogger() root = logging.getLogger()
root.setLevel(level) root.setLevel(level)
@ -98,6 +99,21 @@ def setup_logging(
root.addHandler(db_handler) root.addHandler(db_handler)
_IS_CONFIGURED = True _IS_CONFIGURED = True
cfg = get_config()
root.info(
"日志系统初始化完成",
extra={
"stage": "config",
"config": {
"tushare_token_set": bool(cfg.tushare_token),
"force_refresh": cfg.force_refresh,
"data_root": str(cfg.data_paths.root),
"logfile": str(logfile),
"max_calls_per_minute": cfg.max_calls_per_minute,
},
},
)
return root return root