473 lines
14 KiB
Python
473 lines
14 KiB
Python
"""
|
|
数据处理和定时任务模块单元测试
|
|
测试数据处理器和定时任务调度器的功能
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
from unittest.mock import Mock, patch, AsyncMock
|
|
from datetime import datetime, timedelta
|
|
|
|
from src.data.data_processor import DataProcessor
|
|
from src.scheduler.task_scheduler import TaskScheduler
|
|
from src.utils.exceptions import DataProcessingError
|
|
|
|
|
|
class TestDataProcessor:
|
|
"""数据处理器测试类"""
|
|
|
|
@pytest.fixture
|
|
def data_processor(self):
|
|
"""数据处理器实例"""
|
|
return DataProcessor()
|
|
|
|
def test_process_stock_basic_info_success(self, data_processor):
|
|
"""测试处理股票基础信息成功"""
|
|
raw_data = [
|
|
{
|
|
"代码": "000001",
|
|
"名称": "平安银行",
|
|
"市场": "主板",
|
|
"行业": "银行",
|
|
"地区": "广东",
|
|
"上市日期": "1991-04-03",
|
|
"数据源": "akshare"
|
|
}
|
|
]
|
|
|
|
result = data_processor.process_stock_basic_info(raw_data)
|
|
|
|
assert len(result) == 1
|
|
assert result[0]["code"] == "000001"
|
|
assert result[0]["name"] == "平安银行"
|
|
assert result[0]["market"] == "主板"
|
|
assert result[0]["industry"] == "银行"
|
|
assert result[0]["area"] == "广东"
|
|
assert result[0]["ipo_date"] == "1991-04-03"
|
|
assert result[0]["data_source"] == "akshare"
|
|
|
|
def test_process_stock_basic_info_missing_fields(self, data_processor):
|
|
"""测试处理缺失字段的股票基础信息"""
|
|
raw_data = [
|
|
{
|
|
"代码": "000001",
|
|
"名称": "平安银行"
|
|
# 缺少其他字段
|
|
}
|
|
]
|
|
|
|
result = data_processor.process_stock_basic_info(raw_data)
|
|
|
|
assert len(result) == 1
|
|
assert result[0]["code"] == "000001"
|
|
assert result[0]["name"] == "平安银行"
|
|
assert result[0]["market"] == "未知" # 默认值
|
|
assert result[0]["industry"] == "未知" # 默认值
|
|
assert result[0]["area"] == "未知" # 默认值
|
|
assert result[0]["ipo_date"] is None # 默认值
|
|
|
|
def test_process_daily_kline_data_success(self, data_processor):
|
|
"""测试处理日K线数据成功"""
|
|
raw_data = [
|
|
{
|
|
"日期": "2024-01-15",
|
|
"开盘": 10.5,
|
|
"最高": 11.2,
|
|
"最低": 10.3,
|
|
"收盘": 10.8,
|
|
"成交量": 1000000,
|
|
"成交额": 10800000,
|
|
"数据源": "akshare"
|
|
}
|
|
]
|
|
|
|
result = data_processor.process_daily_kline_data("000001", raw_data)
|
|
|
|
assert len(result) == 1
|
|
assert result[0]["code"] == "000001"
|
|
assert result[0]["date"] == "2024-01-15"
|
|
assert result[0]["open"] == 10.5
|
|
assert result[0]["high"] == 11.2
|
|
assert result[0]["low"] == 10.3
|
|
assert result[0]["close"] == 10.8
|
|
assert result[0]["volume"] == 1000000
|
|
assert result[0]["amount"] == 10800000
|
|
assert result[0]["data_source"] == "akshare"
|
|
|
|
def test_process_daily_kline_data_invalid_values(self, data_processor):
|
|
"""测试处理无效值的日K线数据"""
|
|
raw_data = [
|
|
{
|
|
"日期": "2024-01-15",
|
|
"开盘": "无效值", # 字符串而不是数字
|
|
"最高": 11.2,
|
|
"最低": 10.3,
|
|
"收盘": 10.8,
|
|
"成交量": 1000000,
|
|
"成交额": 10800000
|
|
}
|
|
]
|
|
|
|
with pytest.raises(DataProcessingError):
|
|
data_processor.process_daily_kline_data("000001", raw_data)
|
|
|
|
def test_process_financial_report_data_success(self, data_processor):
|
|
"""测试处理财务报告数据成功"""
|
|
raw_data = [
|
|
{
|
|
"报告期": "2023-12-31",
|
|
"报告类型": "年报",
|
|
"每股收益": 1.5,
|
|
"净利润": 1500000000,
|
|
"营业收入": 5000000000,
|
|
"总资产": 10000000000,
|
|
"数据源": "akshare"
|
|
}
|
|
]
|
|
|
|
result = data_processor.process_financial_report_data("000001", raw_data)
|
|
|
|
assert len(result) == 1
|
|
assert result[0]["code"] == "000001"
|
|
assert result[0]["report_date"] == "2023-12-31"
|
|
assert result[0]["report_type"] == "年报"
|
|
assert result[0]["eps"] == 1.5
|
|
assert result[0]["net_profit"] == 1500000000
|
|
assert result[0]["revenue"] == 5000000000
|
|
assert result[0]["total_assets"] == 10000000000
|
|
assert result[0]["data_source"] == "akshare"
|
|
|
|
def test_validate_data_success(self, data_processor):
|
|
"""测试数据验证成功"""
|
|
valid_data = [
|
|
{
|
|
"code": "000001",
|
|
"name": "平安银行",
|
|
"market": "主板",
|
|
"ipo_date": "1991-04-03"
|
|
}
|
|
]
|
|
|
|
result = data_processor._validate_data(valid_data, ["code", "name"])
|
|
|
|
assert result is True
|
|
|
|
def test_validate_data_missing_required_fields(self, data_processor):
|
|
"""测试数据验证缺失必要字段"""
|
|
invalid_data = [
|
|
{
|
|
"code": "000001"
|
|
# 缺少name字段
|
|
}
|
|
]
|
|
|
|
with pytest.raises(DataProcessingError):
|
|
data_processor._validate_data(invalid_data, ["code", "name"])
|
|
|
|
def test_clean_data_success(self, data_processor):
|
|
"""测试数据清洗成功"""
|
|
dirty_data = [
|
|
{
|
|
"code": " 000001 ", # 有空格
|
|
"name": "平安银行",
|
|
"market": "主板",
|
|
"ipo_date": "1991-04-03"
|
|
}
|
|
]
|
|
|
|
result = data_processor._clean_data(dirty_data)
|
|
|
|
assert len(result) == 1
|
|
assert result[0]["code"] == "000001" # 空格被去除
|
|
|
|
def test_standardize_data_success(self, data_processor):
|
|
"""测试数据标准化成功"""
|
|
raw_data = [
|
|
{
|
|
"代码": "000001",
|
|
"名称": "平安银行",
|
|
"市场": "主板"
|
|
}
|
|
]
|
|
|
|
field_mapping = {
|
|
"代码": "code",
|
|
"名称": "name",
|
|
"市场": "market"
|
|
}
|
|
|
|
result = data_processor._standardize_data(raw_data, field_mapping)
|
|
|
|
assert len(result) == 1
|
|
assert "code" in result[0]
|
|
assert "name" in result[0]
|
|
assert "market" in result[0]
|
|
assert result[0]["code"] == "000001"
|
|
assert result[0]["name"] == "平安银行"
|
|
assert result[0]["market"] == "主板"
|
|
|
|
|
|
class TestTaskScheduler:
|
|
"""定时任务调度器测试类"""
|
|
|
|
@pytest.fixture
|
|
def task_scheduler(self):
|
|
"""定时任务调度器实例"""
|
|
return TaskScheduler()
|
|
|
|
def test_scheduler_initialization(self, task_scheduler):
|
|
"""测试调度器初始化"""
|
|
assert task_scheduler.scheduler is None
|
|
assert task_scheduler.is_running is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_scheduler_success(self, task_scheduler):
|
|
"""测试启动调度器成功"""
|
|
with patch.object(task_scheduler, "_configure_jobs"):
|
|
result = await task_scheduler.start()
|
|
|
|
assert result is True
|
|
assert task_scheduler.is_running is True
|
|
assert task_scheduler.scheduler is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_scheduler_success(self, task_scheduler):
|
|
"""测试停止调度器成功"""
|
|
# 先启动调度器
|
|
with patch.object(task_scheduler, "_configure_jobs"):
|
|
await task_scheduler.start()
|
|
|
|
# 停止调度器
|
|
result = await task_scheduler.stop()
|
|
|
|
assert result is True
|
|
assert task_scheduler.is_running is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_scheduler_not_running(self, task_scheduler):
|
|
"""测试停止未运行的调度器"""
|
|
result = await task_scheduler.stop()
|
|
|
|
assert result is False
|
|
|
|
def test_configure_daily_kline_job(self, task_scheduler):
|
|
"""测试配置每日K线数据更新任务"""
|
|
with patch.object(task_scheduler, "_add_job") as mock_add_job:
|
|
task_scheduler._configure_daily_kline_job()
|
|
|
|
mock_add_job.assert_called_once()
|
|
call_args = mock_add_job.call_args[0]
|
|
|
|
assert call_args[0] == "daily_kline_update"
|
|
assert call_args[1] == "cron"
|
|
assert call_args[2] == task_scheduler._update_daily_kline_data
|
|
assert call_args[3] == {"hour": 18, "minute": 0} # 下午6点
|
|
|
|
def test_configure_weekly_financial_job(self, task_scheduler):
|
|
"""测试配置每周财务数据更新任务"""
|
|
with patch.object(task_scheduler, "_add_job") as mock_add_job:
|
|
task_scheduler._configure_weekly_financial_job()
|
|
|
|
mock_add_job.assert_called_once()
|
|
call_args = mock_add_job.call_args[0]
|
|
|
|
assert call_args[0] == "weekly_financial_update"
|
|
assert call_args[1] == "cron"
|
|
assert call_args[2] == task_scheduler._update_financial_data
|
|
assert call_args[3] == {"day_of_week": 0, "hour": 20, "minute": 0} # 周日晚上8点
|
|
|
|
def test_configure_monthly_basic_job(self, task_scheduler):
|
|
"""测试配置每月股票基础信息更新任务"""
|
|
with patch.object(task_scheduler, "_add_job") as mock_add_job:
|
|
task_scheduler._configure_monthly_basic_job()
|
|
|
|
mock_add_job.assert_called_once()
|
|
call_args = mock_add_job.call_args[0]
|
|
|
|
assert call_args[0] == "monthly_basic_update"
|
|
assert call_args[1] == "cron"
|
|
assert call_args[2] == task_scheduler._update_stock_basic_info
|
|
assert call_args[3] == {"day": 1, "hour": 22, "minute": 0} # 每月1日晚上10点
|
|
|
|
def test_configure_daily_health_check_job(self, task_scheduler):
|
|
"""测试配置每日健康检查任务"""
|
|
with patch.object(task_scheduler, "_add_job") as mock_add_job:
|
|
task_scheduler._configure_daily_health_check_job()
|
|
|
|
mock_add_job.assert_called_once()
|
|
call_args = mock_add_job.call_args[0]
|
|
|
|
assert call_args[0] == "daily_health_check"
|
|
assert call_args[1] == "cron"
|
|
assert call_args[2] == task_scheduler._health_check
|
|
assert call_args[3] == {"hour": 9, "minute": 0} # 早上9点
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_daily_kline_data_success(self, task_scheduler):
|
|
"""测试更新日K线数据成功"""
|
|
mock_data = [
|
|
{
|
|
"code": "000001",
|
|
"date": "2024-01-15",
|
|
"open": 10.5,
|
|
"close": 10.8
|
|
}
|
|
]
|
|
|
|
with patch.object(task_scheduler.data_manager, "get_daily_kline_data", return_value=mock_data):
|
|
with patch.object(task_scheduler.stock_repo, "save_daily_kline_data", return_value=True):
|
|
result = await task_scheduler._update_daily_kline_data()
|
|
|
|
assert result is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_daily_kline_data_failure(self, task_scheduler):
|
|
"""测试更新日K线数据失败"""
|
|
with patch.object(task_scheduler.data_manager, "get_daily_kline_data", side_effect=Exception("API错误")):
|
|
result = await task_scheduler._update_daily_kline_data()
|
|
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_financial_data_success(self, task_scheduler):
|
|
"""测试更新财务数据成功"""
|
|
mock_data = [
|
|
{
|
|
"code": "000001",
|
|
"report_date": "2023-12-31",
|
|
"eps": 1.5
|
|
}
|
|
]
|
|
|
|
with patch.object(task_scheduler.data_manager, "get_financial_report", return_value=mock_data):
|
|
with patch.object(task_scheduler.stock_repo, "save_financial_report_data", return_value=True):
|
|
result = await task_scheduler._update_financial_data()
|
|
|
|
assert result is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_stock_basic_info_success(self, task_scheduler):
|
|
"""测试更新股票基础信息成功"""
|
|
mock_data = [
|
|
{
|
|
"code": "000001",
|
|
"name": "平安银行",
|
|
"market": "主板"
|
|
}
|
|
]
|
|
|
|
with patch.object(task_scheduler.data_manager, "get_stock_basic_info", return_value=mock_data):
|
|
with patch.object(task_scheduler.stock_repo, "save_stock_basic_info", return_value=True):
|
|
result = await task_scheduler._update_stock_basic_info()
|
|
|
|
assert result is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health_check_success(self, task_scheduler):
|
|
"""测试健康检查成功"""
|
|
with patch.object(task_scheduler.data_manager, "get_stock_basic_info", return_value=[]):
|
|
with patch.object(task_scheduler.stock_repo, "get_stock_basic_info", return_value=[]):
|
|
result = await task_scheduler._health_check()
|
|
|
|
assert result is True
|
|
|
|
def test_get_job_status(self, task_scheduler):
|
|
"""测试获取任务状态"""
|
|
# 启动调度器
|
|
with patch.object(task_scheduler, "_configure_jobs"):
|
|
task_scheduler.start()
|
|
|
|
# 添加一个测试任务
|
|
def test_job():
|
|
pass
|
|
|
|
task_scheduler._add_job("test_job", "date", test_job, {"run_date": datetime.now() + timedelta(hours=1)})
|
|
|
|
# 获取任务状态
|
|
status = task_scheduler.get_job_status()
|
|
|
|
assert "test_job" in status
|
|
assert status["test_job"]["next_run_time"] is not None
|
|
|
|
def test_remove_job_success(self, task_scheduler):
|
|
"""测试移除任务成功"""
|
|
# 启动调度器
|
|
with patch.object(task_scheduler, "_configure_jobs"):
|
|
task_scheduler.start()
|
|
|
|
# 添加一个测试任务
|
|
def test_job():
|
|
pass
|
|
|
|
task_scheduler._add_job("test_job", "date", test_job, {"run_date": datetime.now() + timedelta(hours=1)})
|
|
|
|
# 移除任务
|
|
result = task_scheduler.remove_job("test_job")
|
|
|
|
assert result is True
|
|
|
|
# 验证任务已被移除
|
|
status = task_scheduler.get_job_status()
|
|
assert "test_job" not in status
|
|
|
|
def test_remove_job_not_found(self, task_scheduler):
|
|
"""测试移除不存在的任务"""
|
|
# 启动调度器
|
|
with patch.object(task_scheduler, "_configure_jobs"):
|
|
task_scheduler.start()
|
|
|
|
# 移除不存在的任务
|
|
result = task_scheduler.remove_job("nonexistent_job")
|
|
|
|
assert result is False
|
|
|
|
|
|
class TestIntegration:
|
|
"""集成测试类"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_data_processing_pipeline(self):
|
|
"""测试数据处理流水线"""
|
|
# 创建数据处理器
|
|
processor = DataProcessor()
|
|
|
|
# 模拟原始数据
|
|
raw_stock_data = [
|
|
{
|
|
"代码": "000001",
|
|
"名称": "平安银行",
|
|
"市场": "主板",
|
|
"行业": "银行",
|
|
"地区": "广东",
|
|
"上市日期": "1991-04-03"
|
|
}
|
|
]
|
|
|
|
# 处理数据
|
|
processed_data = processor.process_stock_basic_info(raw_stock_data)
|
|
|
|
# 验证处理结果
|
|
assert len(processed_data) == 1
|
|
assert processed_data[0]["code"] == "000001"
|
|
assert processed_data[0]["name"] == "平安银行"
|
|
assert processed_data[0]["market"] == "主板"
|
|
assert processed_data[0]["industry"] == "银行"
|
|
assert processed_data[0]["area"] == "广东"
|
|
assert processed_data[0]["ipo_date"] == "1991-04-03"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scheduler_integration(self):
|
|
"""测试调度器集成"""
|
|
scheduler = TaskScheduler()
|
|
|
|
# Mock所有依赖
|
|
with patch.object(scheduler, "_configure_jobs"):
|
|
# 启动调度器
|
|
result = await scheduler.start()
|
|
assert result is True
|
|
assert scheduler.is_running is True
|
|
|
|
# 停止调度器
|
|
result = await scheduler.stop()
|
|
assert result is True
|
|
assert scheduler.is_running is False |