refactor decision workflow and optimize feature snapshot loading

This commit is contained in:
sam 2025-10-16 09:54:55 +08:00
parent 2062a11181
commit 2220b5084e
10 changed files with 2331 additions and 2077 deletions

View File

@ -147,79 +147,117 @@ def target_weight_for_action(action: AgentAction) -> float:
return mapping[action] return mapping[action]
def decide( class DecisionWorkflow:
context: AgentContext, def __init__(
agents: Iterable[Agent], self,
weights: Mapping[str, float], context: AgentContext,
method: str = "nash", agents: Iterable[Agent],
department_manager: Optional[DepartmentManager] = None, weights: Mapping[str, float],
department_context: Optional[DepartmentContext] = None, method: str,
) -> Decision: department_manager: Optional[DepartmentManager],
agent_list = list(agents) department_context: Optional[DepartmentContext],
utilities = compute_utilities(agent_list, context) ) -> None:
feas_actions = feasible_actions(agent_list, context) self.context = context
if not feas_actions: self.agent_list = list(agents)
return Decision( self.method = method
action=AgentAction.HOLD, self.department_manager = department_manager
confidence=0.0, self.department_context = department_context
target_weight=0.0, self.utilities = compute_utilities(self.agent_list, context)
feasible_actions=[], self.feasible_actions = feasible_actions(self.agent_list, context)
utilities=utilities, self.raw_weights = dict(weights)
self.department_decisions: Dict[str, DepartmentDecision] = {}
self.department_votes: Dict[str, float] = {}
self.host = ProtocolHost()
self.host_trace = self.host.bootstrap_trace(
session_id=f"{context.ts_code}:{context.trade_date}",
ts_code=context.ts_code,
trade_date=context.trade_date,
)
self.briefing_round = self.host.start_round(
self.host_trace,
agenda="situation_briefing",
structure=GameStructure.SIGNALING,
)
self.host.handle_message(self.briefing_round, _host_briefing_message(context))
self.host.finalize_round(self.briefing_round)
self.department_round: Optional[RoundSummary] = None
self.risk_round: Optional[RoundSummary] = None
self.execution_round: Optional[RoundSummary] = None
self.belief_updates: Dict[str, BeliefUpdate] = {}
self.prediction_round: Optional[RoundSummary] = None
self.norm_weights: Dict[str, float] = {}
self.filtered_utilities: Dict[AgentAction, Dict[str, float]] = {}
self.belief_revision: Optional[BeliefRevisionResult] = None
def run(self) -> Decision:
if not self.feasible_actions:
return Decision(
action=AgentAction.HOLD,
confidence=0.0,
target_weight=0.0,
feasible_actions=[],
utilities=self.utilities,
)
self._evaluate_departments()
action, confidence = self._select_action()
risk_assessment = self._apply_risk(action)
exec_action = self._finalize_execution(action, risk_assessment)
self._finalize_conflicts(exec_action)
rounds = self.host_trace.rounds or _build_round_summaries(
self.department_decisions,
action,
self.department_votes,
) )
raw_weights = dict(weights) return Decision(
department_decisions: Dict[str, DepartmentDecision] = {} action=action,
department_votes: Dict[str, float] = {} confidence=confidence,
host = ProtocolHost() target_weight=target_weight_for_action(action),
host_trace = host.bootstrap_trace( feasible_actions=self.feasible_actions,
session_id=f"{context.ts_code}:{context.trade_date}", utilities=self.utilities,
ts_code=context.ts_code, department_decisions=self.department_decisions,
trade_date=context.trade_date, department_votes=self.department_votes,
) requires_review=risk_assessment.status != "ok",
briefing_round = host.start_round( rounds=rounds,
host_trace, risk_assessment=risk_assessment,
agenda="situation_briefing", belief_updates=self.belief_updates,
structure=GameStructure.SIGNALING, belief_revision=self.belief_revision,
) )
host.handle_message(briefing_round, _host_briefing_message(context))
host.finalize_round(briefing_round)
department_round: Optional[RoundSummary] = None
risk_round: Optional[RoundSummary] = None
execution_round: Optional[RoundSummary] = None
belief_updates: Dict[str, BeliefUpdate] = {}
if department_manager: def _evaluate_departments(self) -> None:
dept_context = department_context if not self.department_manager:
if dept_context is None: return
dept_context = DepartmentContext(
ts_code=context.ts_code, dept_context = self.department_context or DepartmentContext(
trade_date=context.trade_date, ts_code=self.context.ts_code,
features=dict(context.features), trade_date=self.context.trade_date,
market_snapshot=dict(getattr(context, "market_snapshot", {}) or {}), features=dict(self.context.features),
raw=dict(getattr(context, "raw", {}) or {}), market_snapshot=dict(getattr(self.context, "market_snapshot", {}) or {}),
) raw=dict(getattr(self.context, "raw", {}) or {}),
department_decisions = department_manager.evaluate(dept_context) )
if department_decisions: self.department_decisions = self.department_manager.evaluate(dept_context)
department_round = host.start_round( if self.department_decisions:
host_trace, self.department_round = self.host.start_round(
self.host_trace,
agenda="department_consensus", agenda="department_consensus",
structure=GameStructure.REPEATED, structure=GameStructure.REPEATED,
) )
for code, decision in department_decisions.items(): for code, decision in self.department_decisions.items():
agent_key = f"dept_{code}" agent_key = f"dept_{code}"
dept_agent = department_manager.agents.get(code) dept_agent = self.department_manager.agents.get(code)
weight = dept_agent.settings.weight if dept_agent else 1.0 weight = dept_agent.settings.weight if dept_agent else 1.0
raw_weights[agent_key] = weight self.raw_weights[agent_key] = weight
scores = _department_scores(decision) scores = _department_scores(decision)
for action in ACTIONS: for action in ACTIONS:
utilities.setdefault(action, {})[agent_key] = scores[action] self.utilities.setdefault(action, {})[agent_key] = scores[action]
bucket = _department_vote_bucket(decision.action) bucket = _department_vote_bucket(decision.action)
if bucket: if bucket:
department_votes[bucket] = department_votes.get(bucket, 0.0) + weight * decision.confidence self.department_votes[bucket] = self.department_votes.get(bucket, 0.0) + weight * decision.confidence
if department_round: if self.department_round:
message = _department_message(code, decision) message = _department_message(code, decision)
host.handle_message(department_round, message) self.host.handle_message(self.department_round, message)
belief_updates[code] = BeliefUpdate( self.belief_updates[code] = BeliefUpdate(
belief={ belief={
"action": decision.action.value, "action": decision.action.value,
"confidence": decision.confidence, "confidence": decision.confidence,
@ -228,173 +266,185 @@ def decide(
rationale=decision.summary, rationale=decision.summary,
) )
filtered_utilities = {action: utilities[action] for action in feas_actions} def _select_action(self) -> Tuple[AgentAction, float]:
hold_scores = utilities.get(AgentAction.HOLD, {}) self.filtered_utilities = {action: self.utilities[action] for action in self.feasible_actions}
norm_weights = weight_map(raw_weights) hold_scores = self.utilities.get(AgentAction.HOLD, {})
prediction_round = host.start_round( self.norm_weights = weight_map(self.raw_weights)
host_trace, self.prediction_round = self.host.start_round(
agenda="prediction_alignment", self.host_trace,
structure=GameStructure.REPEATED, agenda="prediction_alignment",
) structure=GameStructure.REPEATED,
prediction_message, prediction_summary = _prediction_summary_message(filtered_utilities, norm_weights)
host.handle_message(prediction_round, prediction_message)
host.finalize_round(prediction_round)
if prediction_summary:
belief_updates["prediction_summary"] = BeliefUpdate(
belief=prediction_summary,
rationale="Aggregated utilities shared during alignment round.",
) )
prediction_message, prediction_summary = _prediction_summary_message(self.filtered_utilities, self.norm_weights)
if method == "vote": self.host.handle_message(self.prediction_round, prediction_message)
action, confidence = vote(filtered_utilities, norm_weights) self.host.finalize_round(self.prediction_round)
else: if prediction_summary:
action, confidence = nash_bargain(filtered_utilities, norm_weights, hold_scores) self.belief_updates["prediction_summary"] = BeliefUpdate(
if action not in feas_actions: belief=prediction_summary,
action, confidence = vote(filtered_utilities, norm_weights) rationale="Aggregated utilities shared during alignment round.",
weight = target_weight_for_action(action)
conflict_flag = _department_conflict_flag(department_votes)
risk_agent = _find_risk_agent(agent_list)
risk_assessment = _evaluate_risk(
context,
action,
department_votes,
conflict_flag,
risk_agent,
)
requires_review = risk_assessment.status != "ok"
if department_round:
department_round.notes.setdefault("department_votes", dict(department_votes))
department_round.outcome = action.value
host.finalize_round(department_round)
if requires_review:
risk_round = host.ensure_round(
host_trace,
agenda="risk_review",
structure=GameStructure.CUSTOM,
)
review_message = DialogueMessage(
sender="risk_guard",
role=DialogueRole.RISK,
message_type=MessageType.COUNTER,
content=_risk_review_message(risk_assessment.reason),
confidence=1.0,
references=list(department_votes.keys()),
annotations={
"department_votes": dict(department_votes),
"risk_reason": risk_assessment.reason,
"recommended_action": (
risk_assessment.recommended_action.value
if risk_assessment.recommended_action
else None
),
"notes": dict(risk_assessment.notes),
},
)
host.handle_message(risk_round, review_message)
risk_round.notes.setdefault("status", risk_assessment.status)
risk_round.notes.setdefault("reason", risk_assessment.reason)
if risk_assessment.recommended_action:
risk_round.notes.setdefault(
"recommended_action",
risk_assessment.recommended_action.value,
) )
risk_round.outcome = "REVIEW"
host.finalize_round(risk_round) if self.method == "vote":
belief_updates["risk_guard"] = BeliefUpdate( return vote(self.filtered_utilities, self.norm_weights)
belief={
"status": risk_assessment.status, action, confidence = nash_bargain(self.filtered_utilities, self.norm_weights, hold_scores)
"reason": risk_assessment.reason, if action not in self.feasible_actions:
"recommended_action": ( return vote(self.filtered_utilities, self.norm_weights)
risk_assessment.recommended_action.value return action, confidence
if risk_assessment.recommended_action
else None def _apply_risk(self, action: AgentAction) -> RiskAssessment:
), conflict_flag = _department_conflict_flag(self.department_votes)
risk_agent = _find_risk_agent(self.agent_list)
assessment = _evaluate_risk(
self.context,
action,
self.department_votes,
conflict_flag,
risk_agent,
)
if self.department_round:
self.department_round.notes.setdefault("department_votes", dict(self.department_votes))
self.department_round.outcome = action.value
self.host.finalize_round(self.department_round)
if assessment.status != "ok":
self.risk_round = self.host.ensure_round(
self.host_trace,
agenda="risk_review",
structure=GameStructure.CUSTOM,
)
review_message = DialogueMessage(
sender="risk_guard",
role=DialogueRole.RISK,
message_type=MessageType.COUNTER,
content=_risk_review_message(assessment.reason),
confidence=1.0,
references=list(self.department_votes.keys()),
annotations={
"department_votes": dict(self.department_votes),
"risk_reason": assessment.reason,
"recommended_action": (
assessment.recommended_action.value
if assessment.recommended_action
else None
),
"notes": dict(assessment.notes),
},
)
self.host.handle_message(self.risk_round, review_message)
self.risk_round.notes.setdefault("status", assessment.status)
self.risk_round.notes.setdefault("reason", assessment.reason)
if assessment.recommended_action:
self.risk_round.notes.setdefault(
"recommended_action",
assessment.recommended_action.value,
)
self.risk_round.outcome = "REVIEW"
self.host.finalize_round(self.risk_round)
self.belief_updates["risk_guard"] = BeliefUpdate(
belief={
"status": assessment.status,
"reason": assessment.reason,
"recommended_action": (
assessment.recommended_action.value
if assessment.recommended_action
else None
),
},
)
return assessment
def _finalize_execution(
self,
action: AgentAction,
assessment: RiskAssessment,
) -> AgentAction:
self.execution_round = self.host.ensure_round(
self.host_trace,
agenda="execution_summary",
structure=GameStructure.REPEATED,
)
exec_action = action
exec_weight = target_weight_for_action(action)
exec_status = "normal"
requires_review = assessment.status != "ok"
if requires_review and assessment.recommended_action:
exec_action = assessment.recommended_action
exec_status = "risk_adjusted"
exec_weight = target_weight_for_action(exec_action)
execution_message = DialogueMessage(
sender="execution_engine",
role=DialogueRole.EXECUTION,
message_type=MessageType.DIRECTIVE,
content=f"执行操作 {exec_action.value}",
confidence=1.0,
annotations={
"target_weight": exec_weight,
"requires_review": requires_review,
"execution_status": exec_status,
}, },
) )
execution_round = host.ensure_round( self.host.handle_message(self.execution_round, execution_message)
host_trace, self.execution_round.outcome = exec_action.value
agenda="execution_summary", self.execution_round.notes.setdefault("execution_status", exec_status)
structure=GameStructure.REPEATED, if exec_action is not action:
) self.execution_round.notes.setdefault("original_action", action.value)
exec_action = action self.belief_updates["execution"] = BeliefUpdate(
exec_weight = weight belief={
exec_status = "normal" "execution_status": exec_status,
if requires_review and risk_assessment.recommended_action: "action": exec_action.value,
exec_action = risk_assessment.recommended_action "target_weight": exec_weight,
exec_status = "risk_adjusted" },
exec_weight = target_weight_for_action(exec_action)
execution_message = DialogueMessage(
sender="execution_engine",
role=DialogueRole.EXECUTION,
message_type=MessageType.DIRECTIVE,
content=f"执行操作 {exec_action.value}",
confidence=1.0,
annotations={
"target_weight": exec_weight,
"requires_review": requires_review,
"execution_status": exec_status,
},
)
host.handle_message(execution_round, execution_message)
execution_round.outcome = exec_action.value
execution_round.notes.setdefault("execution_status", exec_status)
if exec_action is not action:
execution_round.notes.setdefault("original_action", action.value)
belief_updates["execution"] = BeliefUpdate(
belief={
"execution_status": exec_status,
"action": exec_action.value,
"target_weight": exec_weight,
},
)
host.finalize_round(execution_round)
host.close(host_trace)
rounds = host_trace.rounds if host_trace.rounds else _build_round_summaries(
department_decisions,
action,
department_votes,
)
belief_revision = revise_beliefs(belief_updates, exec_action)
if belief_revision.conflicts:
risk_round = host.ensure_round(
host_trace,
agenda="conflict_resolution",
structure=GameStructure.CUSTOM,
) )
conflict_message = DialogueMessage( self.host.finalize_round(self.execution_round)
sender="protocol_host", self.execution_round.notes.setdefault("target_weight", exec_weight)
role=DialogueRole.HOST, return exec_action
message_type=MessageType.COUNTER,
content="检测到关键冲突,需要后续回合复核。", def _finalize_conflicts(self, exec_action: AgentAction) -> None:
annotations={"conflicts": belief_revision.conflicts}, self.host.close(self.host_trace)
) self.belief_revision = revise_beliefs(self.belief_updates, exec_action)
host.handle_message(risk_round, conflict_message) if self.belief_revision.conflicts:
risk_round.notes.setdefault("conflicts", belief_revision.conflicts) conflict_round = self.host.ensure_round(
host.finalize_round(risk_round) self.host_trace,
execution_round.notes.setdefault("consensus_action", belief_revision.consensus_action.value) agenda="conflict_resolution",
execution_round.notes.setdefault("consensus_confidence", belief_revision.consensus_confidence) structure=GameStructure.CUSTOM,
if belief_revision.conflicts: )
execution_round.notes.setdefault("conflicts", belief_revision.conflicts) conflict_message = DialogueMessage(
if belief_revision.notes: sender="protocol_host",
execution_round.notes.setdefault("belief_notes", belief_revision.notes) role=DialogueRole.HOST,
return Decision( message_type=MessageType.COUNTER,
action=action, content="检测到关键冲突,需要后续回合复核。",
confidence=confidence, annotations={"conflicts": self.belief_revision.conflicts},
target_weight=weight, )
feasible_actions=feas_actions, self.host.handle_message(conflict_round, conflict_message)
utilities=utilities, conflict_round.notes.setdefault("conflicts", self.belief_revision.conflicts)
department_decisions=department_decisions, self.host.finalize_round(conflict_round)
department_votes=department_votes, if self.execution_round:
requires_review=requires_review, self.execution_round.notes.setdefault("consensus_action", self.belief_revision.consensus_action.value)
rounds=rounds, self.execution_round.notes.setdefault("consensus_confidence", self.belief_revision.consensus_confidence)
risk_assessment=risk_assessment, if self.belief_revision.conflicts:
belief_updates=belief_updates, self.execution_round.notes.setdefault("conflicts", self.belief_revision.conflicts)
belief_revision=belief_revision, if self.belief_revision.notes:
self.execution_round.notes.setdefault("belief_notes", self.belief_revision.notes)
def decide(
context: AgentContext,
agents: Iterable[Agent],
weights: Mapping[str, float],
method: str = "nash",
department_manager: Optional[DepartmentManager] = None,
department_context: Optional[DepartmentContext] = None,
) -> Decision:
workflow = DecisionWorkflow(
context,
agents,
weights,
method,
department_manager,
department_context,
) )
return workflow.run()
def _department_scores(decision: DepartmentDecision) -> Dict[AgentAction, float]: def _department_scores(decision: DepartmentDecision) -> Dict[AgentAction, float]:

View File

@ -17,6 +17,7 @@ from app.llm.metrics import record_decision as metrics_record_decision
from app.agents.registry import default_agents from app.agents.registry import default_agents
from app.data.schema import initialize_database from app.data.schema import initialize_database
from app.utils.data_access import DataBroker from app.utils.data_access import DataBroker
from app.utils.feature_snapshots import FeatureSnapshotService
from app.utils.config import PortfolioSettings, get_config from app.utils.config import PortfolioSettings, get_config
from app.utils.db import db_session from app.utils.db import db_session
from app.utils.logging import get_logger from app.utils.logging import get_logger
@ -176,18 +177,35 @@ class BacktestEngine:
trade_date_str = trade_date.strftime("%Y%m%d") trade_date_str = trade_date.strftime("%Y%m%d")
feature_map: Dict[str, Dict[str, Any]] = {} feature_map: Dict[str, Dict[str, Any]] = {}
universe = self.cfg.universe or [] universe = self.cfg.universe or []
snapshot_service = FeatureSnapshotService(self.data_broker)
batch_latest = snapshot_service.load_latest(
trade_date_str,
self.required_fields,
universe,
auto_refresh=False,
)
for ts_code in universe: for ts_code in universe:
scope_values = self.data_broker.fetch_latest( scope_values = dict(batch_latest.get(ts_code) or {})
ts_code,
trade_date_str,
self.required_fields,
auto_refresh=False # 避免回测时触发自动补数
)
missing_fields = [ missing_fields = [
field field
for field in self.required_fields for field in self.required_fields
if scope_values.get(field) is None if scope_values.get(field) is None
] ]
if missing_fields:
fallback = self.data_broker.fetch_latest(
ts_code,
trade_date_str,
missing_fields,
auto_refresh=False,
)
scope_values.update({k: v for k, v in fallback.items() if v is not None})
missing_fields = [
field
for field in self.required_fields
if scope_values.get(field) is None
]
derived_fields: List[str] = [] derived_fields: List[str] = []
if missing_fields: if missing_fields:
LOGGER.debug( LOGGER.debug(

View File

@ -5,11 +5,12 @@ import re
import sqlite3 import sqlite3
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, date, timezone, timedelta from datetime import datetime, date, timezone, timedelta
from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple, Union from typing import Dict, Iterable, List, Mapping, Optional, Sequence, Set, Tuple, Union
from app.core.indicators import momentum, rolling_mean, volatility from app.core.indicators import momentum, rolling_mean, volatility
from app.data.schema import initialize_database from app.data.schema import initialize_database
from app.utils.data_access import DataBroker from app.utils.data_access import DataBroker
from app.utils.feature_snapshots import FeatureSnapshotService
from app.utils.db import db_session from app.utils.db import db_session
from app.utils.logging import get_logger from app.utils.logging import get_logger
# 导入扩展因子模块 # 导入扩展因子模块
@ -30,6 +31,18 @@ LOGGER = get_logger(__name__)
LOG_EXTRA = {"stage": "factor_compute"} LOG_EXTRA = {"stage": "factor_compute"}
_IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") _IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
_LATEST_BASE_FIELDS: List[str] = [
"daily_basic.pe",
"daily_basic.pb",
"daily_basic.ps",
"daily_basic.turnover_rate",
"daily_basic.volume_ratio",
"daily.close",
"daily.amount",
"daily.vol",
"daily_basic.dv_ratio",
]
@dataclass @dataclass
class FactorSpec: class FactorSpec:
@ -502,6 +515,14 @@ def _compute_batch_factors(
# 批次化数据可用性检查 # 批次化数据可用性检查
available_codes = _check_batch_data_availability(broker, ts_codes, trade_date, specs) available_codes = _check_batch_data_availability(broker, ts_codes, trade_date, specs)
snapshot_service = FeatureSnapshotService(broker)
latest_snapshot = snapshot_service.load_latest(
trade_date,
_LATEST_BASE_FIELDS,
list(available_codes),
auto_refresh=False,
)
# 更新UI进度状态 - 开始处理批次 # 更新UI进度状态 - 开始处理批次
if progress and total_securities > 0: if progress and total_securities > 0:
@ -523,7 +544,13 @@ def _compute_batch_factors(
continue continue
# 计算因子值 # 计算因子值
values = _compute_security_factors(broker, ts_code, trade_date, specs) values = _compute_security_factors(
broker,
ts_code,
trade_date,
specs,
latest_fields=latest_snapshot.get(ts_code),
)
if values: if values:
# 检测并处理异常值 # 检测并处理异常值
@ -660,7 +687,7 @@ def _check_batch_data_availability(
# 使用DataBroker的批次化获取最新字段数据 # 使用DataBroker的批次化获取最新字段数据
required_fields = ["daily.close", "daily_basic.turnover_rate"] required_fields = ["daily.close", "daily_basic.turnover_rate"]
batch_fields_data = broker.fetch_batch_latest(sufficient_codes, trade_date, required_fields) batch_fields_data = broker.fetch_batch_latest(list(sufficient_codes), trade_date, required_fields)
# 检查每个证券的必需字段 # 检查每个证券的必需字段
for ts_code in sufficient_codes: for ts_code in sufficient_codes:
@ -753,6 +780,8 @@ def _compute_security_factors(
ts_code: str, ts_code: str,
trade_date: str, trade_date: str,
specs: Sequence[FactorSpec], specs: Sequence[FactorSpec],
*,
latest_fields: Optional[Mapping[str, object]] = None,
) -> Dict[str, float | None]: ) -> Dict[str, float | None]:
"""计算单个证券的因子值 """计算单个证券的因子值
@ -823,21 +852,14 @@ def _compute_security_factors(
) )
# 获取最新字段值 # 获取最新字段值
latest_fields = broker.fetch_latest( if latest_fields is None:
ts_code, latest_fields = broker.fetch_latest(
trade_date, ts_code,
[ trade_date,
"daily_basic.pe", _LATEST_BASE_FIELDS,
"daily_basic.pb", )
"daily_basic.ps", else:
"daily_basic.turnover_rate", latest_fields = dict(latest_fields)
"daily_basic.volume_ratio",
"daily.close",
"daily.amount",
"daily.vol",
"daily_basic.dv_ratio", # 股息率用于扩展因子
],
)
# 计算各个因子值 # 计算各个因子值
results: Dict[str, float | None] = {} results: Dict[str, float | None] = {}

1387
app/ingest/api_client.py Normal file

File diff suppressed because it is too large Load Diff

395
app/ingest/coverage.py Normal file
View File

@ -0,0 +1,395 @@
"""Data coverage orchestration separated from TuShare API calls."""
from __future__ import annotations
import sqlite3
from datetime import date
from typing import Callable, Dict, List, Optional, Sequence
from app.data.schema import initialize_database
from app.utils.db import db_session
from app.utils.logging import get_logger
from .api_client import (
ETF_CODES,
FUND_CODES,
FUTURE_CODES,
FX_CODES,
HK_CODES,
INDEX_CODES,
LOG_EXTRA,
US_CODES,
_expected_trading_days,
_format_date,
_listing_window,
ensure_stock_basic,
ensure_trade_calendar,
fetch_adj_factor,
fetch_daily_basic,
fetch_daily_bars,
fetch_fund_basic,
fetch_fund_nav,
fetch_fut_basic,
fetch_fut_daily,
fetch_fx_daily,
fetch_hk_daily,
fetch_index_basic,
fetch_index_daily,
fetch_index_dailybasic,
fetch_index_weight,
fetch_suspensions,
fetch_stk_limit,
fetch_trade_calendar,
fetch_us_daily,
save_records,
)
LOGGER = get_logger(__name__)
def _range_stats(
table: str,
date_col: str,
start_str: str,
end_str: str,
ts_code: str | None = None,
) -> Dict[str, Optional[str]]:
sql = (
f"SELECT MIN({date_col}) AS min_d, MAX({date_col}) AS max_d, "
f"COUNT(DISTINCT {date_col}) AS distinct_days FROM {table} "
f"WHERE {date_col} BETWEEN ? AND ?"
)
params: List[object] = [start_str, end_str]
if ts_code:
sql += " AND ts_code = ?"
params.append(ts_code)
try:
with db_session(read_only=True) as conn:
row = conn.execute(sql, tuple(params)).fetchone()
except sqlite3.OperationalError:
return {"min": None, "max": None, "distinct": 0}
return {
"min": row["min_d"] if row else None,
"max": row["max_d"] if row else None,
"distinct": row["distinct_days"] if row else 0,
}
def _range_needs_refresh(
table: str,
date_col: str,
start_str: str,
end_str: str,
expected_days: int = 0,
**filters: object,
) -> bool:
ts_code = filters.get("ts_code") or filters.get("index_code")
stats = _range_stats(table, date_col, start_str, end_str, ts_code=ts_code) # type: ignore[arg-type]
if stats["min"] is None or stats["max"] is None:
return True
if stats["min"] > start_str or stats["max"] < end_str:
return True
if expected_days and (stats["distinct"] or 0) < expected_days:
return True
return False
def _should_skip_range(
table: str,
date_col: str,
start: date,
end: date,
ts_code: str | None = None,
) -> bool:
start_str = _format_date(start)
end_str = _format_date(end)
effective_start = start_str
effective_end = end_str
if ts_code:
list_date, delist_date = _listing_window(ts_code)
if list_date:
effective_start = max(effective_start, list_date)
if delist_date:
effective_end = min(effective_end, delist_date)
if effective_start > effective_end:
LOGGER.debug(
"股票 %s 在目标区间之外,跳过补数",
ts_code,
extra=LOG_EXTRA,
)
return True
stats = _range_stats(table, date_col, effective_start, effective_end, ts_code=ts_code)
else:
stats = _range_stats(table, date_col, effective_start, effective_end)
if stats["min"] is None or stats["max"] is None:
return False
if stats["min"] > effective_start or stats["max"] < effective_end:
return False
if ts_code is None:
expected_days = _expected_trading_days(effective_start, effective_end)
if expected_days and (stats["distinct"] or 0) < expected_days:
return False
return True
def ensure_index_weights(start: date, end: date, index_codes: Optional[Sequence[str]] = None) -> None:
if index_codes is None:
index_codes = [code for code in INDEX_CODES if code.endswith(".SH") or code.endswith(".SZ")]
for index_code in index_codes:
start_str = _format_date(start)
end_str = _format_date(end)
if _range_needs_refresh("index_weight", "trade_date", start_str, end_str, index_code=index_code):
LOGGER.info("指数 %s 的成分股权重数据不完整,开始拉取 %s-%s", index_code, start_str, end_str)
save_records("index_weight", fetch_index_weight(start, end, index_code))
else:
LOGGER.info("指数 %s 的成分股权重数据已完整,跳过", index_code)
def ensure_index_dailybasic(start: date, end: date, index_codes: Optional[Sequence[str]] = None) -> None:
if index_codes is None:
index_codes = [code for code in INDEX_CODES if code.endswith(".SH") or code.endswith(".SZ")]
for index_code in index_codes:
start_str = _format_date(start)
end_str = _format_date(end)
if _range_needs_refresh("index_dailybasic", "trade_date", start_str, end_str, ts_code=index_code):
LOGGER.info("指数 %s 的每日指标数据不完整,开始拉取 %s-%s", index_code, start_str, end_str)
save_records("index_dailybasic", fetch_index_dailybasic(start, end, index_code))
else:
LOGGER.info("指数 %s 的每日指标数据已完整,跳过", index_code)
def ensure_data_coverage(
start: date,
end: date,
ts_codes: Optional[Sequence[str]] = None,
include_limits: bool = True,
include_extended: bool = True,
force: bool = False,
progress_hook: Callable[[str, float], None] | None = None,
) -> None:
initialize_database()
start_str = _format_date(start)
end_str = _format_date(end)
extra_steps = 0
if include_limits:
extra_steps += 1
if include_extended:
extra_steps += 4
total_steps = 5 + extra_steps
current_step = 0
def advance(message: str) -> None:
nonlocal current_step
current_step += 1
progress = min(current_step / total_steps, 1.0)
if progress_hook:
progress_hook(message, progress)
LOGGER.info(message, extra=LOG_EXTRA)
advance("准备股票基础信息与交易日历")
ensure_stock_basic()
ensure_trade_calendar(start, end)
codes = tuple(dict.fromkeys(ts_codes)) if ts_codes else tuple()
expected_days = _expected_trading_days(start_str, end_str)
advance("处理日线行情数据")
if codes:
pending_codes: List[str] = []
for code in codes:
if not force and _should_skip_range("daily", "trade_date", start, end, code):
LOGGER.info("股票 %s 的日线已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
pending_codes.append(code)
if pending_codes:
LOGGER.info("开始拉取日线行情:%s-%s(待补股票 %d 支)", start_str, end_str, len(pending_codes))
save_records(
"daily",
fetch_daily_bars(start, end, pending_codes, skip_existing=not force),
)
else:
needs_daily = force or _range_needs_refresh("daily", "trade_date", start_str, end_str, expected_days)
if not needs_daily:
LOGGER.info("日线数据已覆盖 %s-%s,跳过拉取", start_str, end_str)
else:
LOGGER.info("开始拉取日线行情:%s-%s", start_str, end_str)
save_records(
"daily",
fetch_daily_bars(start, end, skip_existing=not force),
)
advance("处理指数成分股权重数据")
ensure_index_weights(start, end)
advance("处理指数每日指标数据")
ensure_index_dailybasic(start, end)
date_cols = {
"daily_basic": "trade_date",
"adj_factor": "trade_date",
"stk_limit": "trade_date",
"suspend": "suspend_date",
"index_daily": "trade_date",
"index_dailybasic": "trade_date",
"index_weight": "trade_date",
"fund_nav": "nav_date",
"fut_daily": "trade_date",
"fx_daily": "trade_date",
"hk_daily": "trade_date",
"us_daily": "trade_date",
}
def _save_with_codes(table: str, fetch_fn) -> None:
date_col = date_cols.get(table, "trade_date")
if codes:
for code in codes:
if not force and _should_skip_range(table, date_col, start, end, code):
LOGGER.info("%s 股票 %s 已覆盖 %s-%s,跳过", table, code, start_str, end_str)
continue
LOGGER.info("拉取 %s 表数据(股票:%s%s-%s", table, code, start_str, end_str)
rows = fetch_fn(start, end, ts_code=code, skip_existing=not force)
save_records(table, rows)
else:
needs_refresh = force or table == "suspend"
if not force and table != "suspend":
expected = expected_days if table in {"daily_basic", "adj_factor", "stk_limit"} else 0
needs_refresh = _range_needs_refresh(table, date_col, start_str, end_str, expected)
if not needs_refresh:
LOGGER.info("%s 已覆盖 %s-%s,跳过", table, start_str, end_str)
return
LOGGER.info("拉取 %s 表数据(全市场)%s-%s", table, start_str, end_str)
rows = fetch_fn(start, end, skip_existing=not force)
save_records(table, rows)
advance("处理日线基础指标数据")
_save_with_codes("daily_basic", fetch_daily_basic)
advance("处理复权因子数据")
_save_with_codes("adj_factor", fetch_adj_factor)
if include_limits:
advance("处理涨跌停价格数据")
_save_with_codes("stk_limit", fetch_stk_limit)
advance("处理停复牌信息")
_save_with_codes("suspend", fetch_suspensions)
if include_extended:
advance("同步指数/基金/期货基础信息")
save_records("index_basic", fetch_index_basic())
save_records("fund_basic", fetch_fund_basic())
save_records("fut_basic", fetch_fut_basic())
advance("拉取指数行情数据")
for code in INDEX_CODES:
if not force and _should_skip_range("index_daily", "trade_date", start, end, code):
LOGGER.info("指数 %s 已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("index_daily", fetch_index_daily(start, end, code))
advance("拉取基金净值数据")
fund_targets = tuple(dict.fromkeys(ETF_CODES + FUND_CODES))
for code in fund_targets:
if not force and _should_skip_range("fund_nav", "nav_date", start, end, code):
LOGGER.info("基金 %s 净值已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("fund_nav", fetch_fund_nav(start, end, code))
advance("拉取期货/外汇行情数据")
for code in FUTURE_CODES:
if not force and _should_skip_range("fut_daily", "trade_date", start, end, code):
LOGGER.info("期货 %s 已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("fut_daily", fetch_fut_daily(start, end, code))
for code in FX_CODES:
if not force and _should_skip_range("fx_daily", "trade_date", start, end, code):
LOGGER.info("外汇 %s 已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("fx_daily", fetch_fx_daily(start, end, code))
advance("拉取港/美股行情数据(已暂时关闭)")
for code in HK_CODES:
if not force and _should_skip_range("hk_daily", "trade_date", start, end, code):
LOGGER.info("港股 %s 已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("hk_daily", fetch_hk_daily(start, end, code))
for code in US_CODES:
if not force and _should_skip_range("us_daily", "trade_date", start, end, code):
LOGGER.info("美股 %s 已覆盖 %s-%s,跳过", code, start_str, end_str)
continue
save_records("us_daily", fetch_us_daily(start, end, code))
if progress_hook:
progress_hook("数据覆盖检查完成", 1.0)
def collect_data_coverage(start: date, end: date) -> Dict[str, Dict[str, object]]:
start_str = _format_date(start)
end_str = _format_date(end)
expected_days = _expected_trading_days(start_str, end_str)
coverage: Dict[str, Dict[str, object]] = {
"period": {
"start": start_str,
"end": end_str,
"expected_trading_days": expected_days,
}
}
def add_table(name: str, date_col: str, require_days: bool = True) -> None:
stats = _range_stats(name, date_col, start_str, end_str)
coverage[name] = {
"min": stats["min"],
"max": stats["max"],
"distinct_days": stats["distinct"],
"meets_expectation": (
stats["min"] is not None
and stats["max"] is not None
and stats["min"] <= start_str
and stats["max"] >= end_str
and ((not require_days) or (stats["distinct"] or 0) >= expected_days)
),
}
add_table("daily", "trade_date")
add_table("daily_basic", "trade_date")
add_table("adj_factor", "trade_date")
add_table("stk_limit", "trade_date")
add_table("suspend", "suspend_date", require_days=False)
add_table("index_daily", "trade_date")
add_table("fund_nav", "nav_date", require_days=False)
add_table("fut_daily", "trade_date", require_days=False)
add_table("fx_daily", "trade_date", require_days=False)
add_table("hk_daily", "trade_date", require_days=False)
add_table("us_daily", "trade_date", require_days=False)
with db_session(read_only=True) as conn:
stock_tot = conn.execute("SELECT COUNT(*) AS cnt FROM stock_basic").fetchone()
stock_sse = conn.execute(
"SELECT COUNT(*) AS cnt FROM stock_basic WHERE exchange = 'SSE' AND list_status = 'L'"
).fetchone()
stock_szse = conn.execute(
"SELECT COUNT(*) AS cnt FROM stock_basic WHERE exchange = 'SZSE' AND list_status = 'L'"
).fetchone()
coverage["stock_basic"] = {
"total": stock_tot["cnt"] if stock_tot else 0,
"sse_listed": stock_sse["cnt"] if stock_sse else 0,
"szse_listed": stock_szse["cnt"] if stock_szse else 0,
}
return coverage
__all__ = [
"collect_data_coverage",
"ensure_data_coverage",
"ensure_index_dailybasic",
"ensure_index_weights",
]

File diff suppressed because it is too large Load Diff

View File

@ -11,40 +11,26 @@ from dataclasses import dataclass, field
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from typing import Any, Callable, ClassVar, Dict, Iterable, List, Optional, Sequence, Set, Tuple from typing import Any, Callable, ClassVar, Dict, Iterable, List, Optional, Sequence, Set, Tuple
from .config import get_config
import types import types
from .config import get_config
from .db import db_session from .db import db_session
from .logging import get_logger from .logging import get_logger
from app.core.indicators import momentum, normalize, rolling_mean, volatility from app.core.indicators import momentum, normalize, rolling_mean, volatility
from app.utils.db_query import BrokerQueryEngine from app.utils.db_query import BrokerQueryEngine
from app.utils import alerts from app.utils import alerts
from app.ingest.coverage import collect_data_coverage as _collect_coverage, ensure_data_coverage as _ensure_coverage
# 延迟导入,避免循环依赖 try:
collect_data_coverage = None from app.data.schema import initialize_database
ensure_data_coverage = None except ImportError:
initialize_database = None def initialize_database():
"""Fallback stub used when the real initializer cannot be imported.
# 在模块加载时尝试导入 Return a lightweight object with the attributes callers expect
if collect_data_coverage is None or ensure_data_coverage is None: (executed, skipped, missing_tables) so code that calls
try: `initialize_database()` can safely inspect the result.
from app.ingest.tushare import collect_data_coverage, ensure_data_coverage """
except ImportError: return types.SimpleNamespace(executed=0, skipped=True, missing_tables=[])
# 导入失败时,在实际使用时会报错
pass
if initialize_database is None:
try:
from app.data.schema import initialize_database
except ImportError:
# 导入失败时,提供一个空实现
def initialize_database():
"""Fallback stub used when the real initializer cannot be imported.
Return a lightweight object with the attributes callers expect
(executed, skipped, missing_tables) so code that calls
`initialize_database()` can safely inspect the result.
"""
return types.SimpleNamespace(executed=0, skipped=True, missing_tables=[])
LOGGER = get_logger(__name__) LOGGER = get_logger(__name__)
LOG_EXTRA = {"stage": "data_broker"} LOG_EXTRA = {"stage": "data_broker"}
@ -56,6 +42,27 @@ def _is_safe_identifier(name: str) -> bool:
return bool(_IDENTIFIER_RE.match(name)) return bool(_IDENTIFIER_RE.match(name))
def _default_coverage_runner(start: date, end: date) -> None:
if _ensure_coverage is None:
LOGGER.debug("默认补数函数不可用,跳过自动补数", extra=LOG_EXTRA)
return
_ensure_coverage(
start,
end,
include_limits=False,
include_extended=False,
force=False,
progress_hook=None,
)
def _default_coverage_collector(start: date, end: date) -> Dict[str, Dict[str, object]]:
if _collect_coverage is None:
LOGGER.debug("默认覆盖统计函数不可用,返回空结果", extra=LOG_EXTRA)
return {}
return _collect_coverage(start, end)
def _safe_split(path: str) -> Tuple[str, str] | None: def _safe_split(path: str) -> Tuple[str, str] | None:
if "." not in path: if "." not in path:
return None return None
@ -197,6 +204,8 @@ class DataBroker:
enable_cache: bool = True enable_cache: bool = True
latest_cache_size: int = 256 latest_cache_size: int = 256
series_cache_size: int = 512 series_cache_size: int = 512
coverage_runner: Callable[[date, date], None] = field(default=_default_coverage_runner)
coverage_collector: Callable[[date, date], Dict[str, Dict[str, object]]] = field(default=_default_coverage_collector)
_latest_cache: OrderedDict = field(init=False, repr=False) _latest_cache: OrderedDict = field(init=False, repr=False)
_series_cache: OrderedDict = field(init=False, repr=False) _series_cache: OrderedDict = field(init=False, repr=False)
# 补数相关状态管理 # 补数相关状态管理
@ -1234,14 +1243,13 @@ class DataBroker:
return False return False
# 收集数据覆盖情况 # 收集数据覆盖情况
if collect_data_coverage is None: if self.coverage_collector is None:
LOGGER.error("collect_data_coverage 函数不可用,请检查导入配置", extra=LOG_EXTRA) LOGGER.debug("未配置覆盖统计函数,无法判断是否需要补数", extra=LOG_EXTRA)
return False return False
coverage = collect_data_coverage( start_d = datetime.strptime(start_date, "%Y%m%d").date()
date.fromisoformat(start_date[:4] + '-' + start_date[4:6] + '-' + start_date[6:8]), end_d = datetime.strptime(end_date, "%Y%m%d").date()
date.fromisoformat(end_date[:4] + '-' + end_date[4:6] + '-' + end_date[6:8]) coverage = self.coverage_collector(start_d, end_d)
)
# 保存到缓存 # 保存到缓存
coverage['timestamp'] = time.time() if hasattr(time, 'time') else 0 coverage['timestamp'] = time.time() if hasattr(time, 'time') else 0
@ -1285,18 +1293,14 @@ class DataBroker:
LOGGER.info("开始后台数据补数: %s%s", start_date, end_date, extra=LOG_EXTRA) LOGGER.info("开始后台数据补数: %s%s", start_date, end_date, extra=LOG_EXTRA)
# 执行补数 # 执行补数
if ensure_data_coverage is None: if self.coverage_runner is None:
LOGGER.error("ensure_data_coverage 函数不可用,请检查导入配置", extra=LOG_EXTRA) LOGGER.debug("未配置覆盖补数函数,跳过自动补数", extra=LOG_EXTRA)
with self._refresh_lock: with self._refresh_lock:
self._refresh_in_progress[refresh_key] = False self._refresh_in_progress[refresh_key] = False
self._refresh_callbacks.pop(refresh_key, None)
return return
ensure_data_coverage( self.coverage_runner(start_date, end_date)
start_date,
end_date,
force=False,
progress_hook=None
)
LOGGER.info("后台数据补数完成: %s%s", start_date, end_date, extra=LOG_EXTRA) LOGGER.info("后台数据补数完成: %s%s", start_date, end_date, extra=LOG_EXTRA)
@ -1661,13 +1665,11 @@ class DataBroker:
start_d = date.fromisoformat(start.strftime('%Y-%m-%d')) start_d = date.fromisoformat(start.strftime('%Y-%m-%d'))
end_d = date.fromisoformat(end.strftime('%Y-%m-%d')) end_d = date.fromisoformat(end.strftime('%Y-%m-%d'))
# 收集数据覆盖情况 if self.coverage_collector is None:
if collect_data_coverage is None: LOGGER.debug("未配置覆盖统计函数,返回空覆盖结果", extra=LOG_EXTRA)
LOGGER.error("collect_data_coverage 函数不可用,请检查导入配置", extra=LOG_EXTRA)
return {} return {}
coverage = collect_data_coverage(start_d, end_d) return self.coverage_collector(start_d, end_d)
return coverage
except Exception as exc: except Exception as exc:
LOGGER.exception("获取数据覆盖情况失败: %s", exc, extra=LOG_EXTRA) LOGGER.exception("获取数据覆盖情况失败: %s", exc, extra=LOG_EXTRA)
return {} return {}

View File

@ -0,0 +1,58 @@
"""Shared feature snapshot helpers built on top of DataBroker."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Iterable, Mapping, Optional, Sequence
from .data_access import DataBroker
@dataclass
class FeatureSnapshotService:
"""Provide batch-oriented access to latest features for multiple symbols."""
broker: DataBroker
def __init__(self, broker: Optional[DataBroker] = None) -> None:
self.broker = broker or DataBroker()
def load_latest(
self,
trade_date: str,
fields: Sequence[str],
ts_codes: Sequence[str],
*,
auto_refresh: bool = False,
) -> Dict[str, Dict[str, object]]:
"""Fetch a snapshot of feature values for the given universe."""
if not ts_codes:
return {}
return self.broker.fetch_batch_latest(
list(ts_codes),
trade_date,
fields,
auto_refresh=auto_refresh,
)
def load_single(
self,
trade_date: str,
ts_code: str,
fields: Iterable[str],
*,
auto_refresh: bool = False,
) -> Mapping[str, object]:
"""Convenience wrapper to reuse the snapshot logic for a single symbol."""
snapshot = self.load_latest(
trade_date,
list(fields),
[ts_code],
auto_refresh=auto_refresh,
)
return snapshot.get(ts_code, {})
__all__ = ["FeatureSnapshotService"]

View File

@ -79,4 +79,4 @@
| --- | --- | --- | | --- | --- | --- |
| 全量代码审查 | 🔄 | 已制定 `docs/architecture/code_review_checklist.md`,按 checklist 推进模块审查。 | | 全量代码审查 | 🔄 | 已制定 `docs/architecture/code_review_checklist.md`,按 checklist 推进模块审查。 |
| TODO 标记治理 | 🔄 | 新增 `scripts/todo_report.py` 支撑定期扫描,待梳理遗留项目。 | | TODO 标记治理 | 🔄 | 新增 `scripts/todo_report.py` 支撑定期扫描,待梳理遗留项目。 |
| 业务逻辑体检 | ⏳ | 梳理业务链路,识别需要重构或优化的模块。 | | 业务逻辑体检 | ✅ | 梳理业务链路完成,已拆分采集/覆盖/决策模块;详见 docs/architecture/business_logic_healthcheck.md。 |

View File

@ -0,0 +1,50 @@
# 业务逻辑体检报告
本报告梳理当前端到端业务链路,并标注出影响可维护性与扩展性的关键风险点,供后续重构排期参考。
## 端到端链路速览
- **数据采集与健康巡检**:命令行入口 `scripts/run_ingestion_job.py` 通过编排层 `app/ingest/tushare.py`,调用 `app/ingest/api_client.py``app/ingest/coverage.py` 完成 TuShare 拉数、数据补齐与指标巡检。
- **数据接入与覆盖治理**`DataBroker` (`app/utils/data_access.py`) 负责字段解析、缓存、派生指标,自动补数由可注入的 `coverage_runner` 承担;批量快照能力由 `FeatureSnapshotService` (`app/utils/feature_snapshots.py`) 暴露给上层。
- **因子与特征加工**`compute_factors` 系列 (`app/features/factors.py`) 借助批量快照与分批校验,输出持久化特征供代理与回测消费。
- **多智能体决策**`DecisionWorkflow` (`app/agents/game.py`) 将议程控制、部门投票、风险审查、执行总结拆分为可维护的阶段,驱动规则代理与部门 LLM 协同。
- **回测与调参与强化学习**`BacktestEngine.load_market_data` (`app/backtest/engine.py`) 使用批量快照聚合特征,`DecisionEnv` (`app/backtest/decision_env.py`) 暴露 RL 行为接口。
- **可视化与运营面板**Streamlit 入口 `app/ui/streamlit_app.py:14-120` 触发数据库初始化、自动补数与多页可视化,消费上述链路的产物。
## 主要发现
### 1. 数据采集模块拆分完成但仍需扩展
- 采集 orchestrator 已收敛在 `app/ingest/tushare.py`API 调用与覆盖校验分别由 `app/ingest/api_client.py`、`app/ingest/coverage.py` 承担,后续可考虑将因子计算改为显式队列任务。
- `run_ingestion` 通过 `post_tasks` 钩子触发默认因子回填,方便引入异步或多阶段处理策略。
### 2. 数据访问层职责下沉但仍偏厚重
- `DataBroker` 引入可注入的 `coverage_runner` 与批量缓存接口,不过派生指标、行业分析仍集中在同一类,可进一步拆分至子组件。
- 自动补数与覆盖统计改为显式依赖 `app/ingest/coverage.py`,消除了懒加载带来的环状依赖风险。
### 3. 因子流水线新增批量快照
- `FeatureSnapshotService` 批量预取最新字段,`compute_factors` 改为按批拼装特征并共用缓存,减少了重复 SQL。
- 校验与进度汇报依旧集中在 `_compute_batch_factors`,后续可继续剥离统计与写库逻辑以优化测试粒度。
### 4. 多智能体决策流程模块化
- `DecisionWorkflow` 将部门投票、风险审查、执行总结拆分为独立方法,便于插桩和单元测试。
- 部门代理仍直接访问 `DataBroker`,后续可对接 `FeatureSnapshotService` 或数据域策略,统一数据获取边界。
### 5. 回测链路复用批量特征
- `BacktestEngine.load_market_data` 与因子流水线共用快照服务,避免重复的最新值查询。
- 强化学习环境仍按日重新构造 `BacktestEngine`,可在后续迭代中缓存快照或拆分环境状态以进一步加速。
## 优先级建议
1. **完善采集流水线的任务编排**(高优先级)
现已拆分 API/覆盖/编排层,建议继续将因子计算与其它后置动作放入独立任务队列,便于并发执行与重试。
2. **解耦 DataBroker 的派生职责**(中高优先级)
将行业、情绪、派生指标等逻辑抽出为独立服务,保留 DataBroker 专注于字段解析与缓存;同步补充更细粒度的单元测试。
3. **推广特征快照到部门代理**(中优先级)
目前因子与回测已复用 `FeatureSnapshotService`,建议在 LLM 部门工具调用中也接入统一快照,降低重复 SQL。
4. **补齐 DecisionWorkflow 测试与监控**(中优先级)
`DecisionWorkflow` 各阶段编写单元/集成测试,并将风险评审与信念修正暴露在监控面板中,便于审计。
5. **建立性能与回归基线**(低优先级)
构造包含快照缓存的基准数据集,度量因子计算和回测的时延,对后续优化提供数据支持。
以上建议可依次推进,亦可按业务优先级穿插执行。