stock/tests/test_data_processor.py

473 lines
14 KiB
Python

"""
数据处理和定时任务模块单元测试
测试数据处理器和定时任务调度器的功能
"""
import pytest
import asyncio
from unittest.mock import Mock, patch, AsyncMock
from datetime import datetime, timedelta
from src.data.data_processor import DataProcessor
from src.scheduler.task_scheduler import TaskScheduler
from src.utils.exceptions import DataProcessingError
class TestDataProcessor:
"""数据处理器测试类"""
@pytest.fixture
def data_processor(self):
"""数据处理器实例"""
return DataProcessor()
def test_process_stock_basic_info_success(self, data_processor):
"""测试处理股票基础信息成功"""
raw_data = [
{
"代码": "000001",
"名称": "平安银行",
"市场": "主板",
"行业": "银行",
"地区": "广东",
"上市日期": "1991-04-03",
"数据源": "akshare"
}
]
result = data_processor.process_stock_basic_info(raw_data)
assert len(result) == 1
assert result[0]["code"] == "000001"
assert result[0]["name"] == "平安银行"
assert result[0]["market"] == "主板"
assert result[0]["industry"] == "银行"
assert result[0]["area"] == "广东"
assert result[0]["ipo_date"] == "1991-04-03"
assert result[0]["data_source"] == "akshare"
def test_process_stock_basic_info_missing_fields(self, data_processor):
"""测试处理缺失字段的股票基础信息"""
raw_data = [
{
"代码": "000001",
"名称": "平安银行"
# 缺少其他字段
}
]
result = data_processor.process_stock_basic_info(raw_data)
assert len(result) == 1
assert result[0]["code"] == "000001"
assert result[0]["name"] == "平安银行"
assert result[0]["market"] == "未知" # 默认值
assert result[0]["industry"] == "未知" # 默认值
assert result[0]["area"] == "未知" # 默认值
assert result[0]["ipo_date"] is None # 默认值
def test_process_daily_kline_data_success(self, data_processor):
"""测试处理日K线数据成功"""
raw_data = [
{
"日期": "2024-01-15",
"开盘": 10.5,
"最高": 11.2,
"最低": 10.3,
"收盘": 10.8,
"成交量": 1000000,
"成交额": 10800000,
"数据源": "akshare"
}
]
result = data_processor.process_daily_kline_data("000001", raw_data)
assert len(result) == 1
assert result[0]["code"] == "000001"
assert result[0]["date"] == "2024-01-15"
assert result[0]["open"] == 10.5
assert result[0]["high"] == 11.2
assert result[0]["low"] == 10.3
assert result[0]["close"] == 10.8
assert result[0]["volume"] == 1000000
assert result[0]["amount"] == 10800000
assert result[0]["data_source"] == "akshare"
def test_process_daily_kline_data_invalid_values(self, data_processor):
"""测试处理无效值的日K线数据"""
raw_data = [
{
"日期": "2024-01-15",
"开盘": "无效值", # 字符串而不是数字
"最高": 11.2,
"最低": 10.3,
"收盘": 10.8,
"成交量": 1000000,
"成交额": 10800000
}
]
with pytest.raises(DataProcessingError):
data_processor.process_daily_kline_data("000001", raw_data)
def test_process_financial_report_data_success(self, data_processor):
"""测试处理财务报告数据成功"""
raw_data = [
{
"报告期": "2023-12-31",
"报告类型": "年报",
"每股收益": 1.5,
"净利润": 1500000000,
"营业收入": 5000000000,
"总资产": 10000000000,
"数据源": "akshare"
}
]
result = data_processor.process_financial_report_data("000001", raw_data)
assert len(result) == 1
assert result[0]["code"] == "000001"
assert result[0]["report_date"] == "2023-12-31"
assert result[0]["report_type"] == "年报"
assert result[0]["eps"] == 1.5
assert result[0]["net_profit"] == 1500000000
assert result[0]["revenue"] == 5000000000
assert result[0]["total_assets"] == 10000000000
assert result[0]["data_source"] == "akshare"
def test_validate_data_success(self, data_processor):
"""测试数据验证成功"""
valid_data = [
{
"code": "000001",
"name": "平安银行",
"market": "主板",
"ipo_date": "1991-04-03"
}
]
result = data_processor._validate_data(valid_data, ["code", "name"])
assert result is True
def test_validate_data_missing_required_fields(self, data_processor):
"""测试数据验证缺失必要字段"""
invalid_data = [
{
"code": "000001"
# 缺少name字段
}
]
with pytest.raises(DataProcessingError):
data_processor._validate_data(invalid_data, ["code", "name"])
def test_clean_data_success(self, data_processor):
"""测试数据清洗成功"""
dirty_data = [
{
"code": " 000001 ", # 有空格
"name": "平安银行",
"market": "主板",
"ipo_date": "1991-04-03"
}
]
result = data_processor._clean_data(dirty_data)
assert len(result) == 1
assert result[0]["code"] == "000001" # 空格被去除
def test_standardize_data_success(self, data_processor):
"""测试数据标准化成功"""
raw_data = [
{
"代码": "000001",
"名称": "平安银行",
"市场": "主板"
}
]
field_mapping = {
"代码": "code",
"名称": "name",
"市场": "market"
}
result = data_processor._standardize_data(raw_data, field_mapping)
assert len(result) == 1
assert "code" in result[0]
assert "name" in result[0]
assert "market" in result[0]
assert result[0]["code"] == "000001"
assert result[0]["name"] == "平安银行"
assert result[0]["market"] == "主板"
class TestTaskScheduler:
"""定时任务调度器测试类"""
@pytest.fixture
def task_scheduler(self):
"""定时任务调度器实例"""
return TaskScheduler()
def test_scheduler_initialization(self, task_scheduler):
"""测试调度器初始化"""
assert task_scheduler.scheduler is None
assert task_scheduler.is_running is False
@pytest.mark.asyncio
async def test_start_scheduler_success(self, task_scheduler):
"""测试启动调度器成功"""
with patch.object(task_scheduler, "_configure_jobs"):
result = await task_scheduler.start()
assert result is True
assert task_scheduler.is_running is True
assert task_scheduler.scheduler is not None
@pytest.mark.asyncio
async def test_stop_scheduler_success(self, task_scheduler):
"""测试停止调度器成功"""
# 先启动调度器
with patch.object(task_scheduler, "_configure_jobs"):
await task_scheduler.start()
# 停止调度器
result = await task_scheduler.stop()
assert result is True
assert task_scheduler.is_running is False
@pytest.mark.asyncio
async def test_stop_scheduler_not_running(self, task_scheduler):
"""测试停止未运行的调度器"""
result = await task_scheduler.stop()
assert result is False
def test_configure_daily_kline_job(self, task_scheduler):
"""测试配置每日K线数据更新任务"""
with patch.object(task_scheduler, "_add_job") as mock_add_job:
task_scheduler._configure_daily_kline_job()
mock_add_job.assert_called_once()
call_args = mock_add_job.call_args[0]
assert call_args[0] == "daily_kline_update"
assert call_args[1] == "cron"
assert call_args[2] == task_scheduler._update_daily_kline_data
assert call_args[3] == {"hour": 18, "minute": 0} # 下午6点
def test_configure_weekly_financial_job(self, task_scheduler):
"""测试配置每周财务数据更新任务"""
with patch.object(task_scheduler, "_add_job") as mock_add_job:
task_scheduler._configure_weekly_financial_job()
mock_add_job.assert_called_once()
call_args = mock_add_job.call_args[0]
assert call_args[0] == "weekly_financial_update"
assert call_args[1] == "cron"
assert call_args[2] == task_scheduler._update_financial_data
assert call_args[3] == {"day_of_week": 0, "hour": 20, "minute": 0} # 周日晚上8点
def test_configure_monthly_basic_job(self, task_scheduler):
"""测试配置每月股票基础信息更新任务"""
with patch.object(task_scheduler, "_add_job") as mock_add_job:
task_scheduler._configure_monthly_basic_job()
mock_add_job.assert_called_once()
call_args = mock_add_job.call_args[0]
assert call_args[0] == "monthly_basic_update"
assert call_args[1] == "cron"
assert call_args[2] == task_scheduler._update_stock_basic_info
assert call_args[3] == {"day": 1, "hour": 22, "minute": 0} # 每月1日晚上10点
def test_configure_daily_health_check_job(self, task_scheduler):
"""测试配置每日健康检查任务"""
with patch.object(task_scheduler, "_add_job") as mock_add_job:
task_scheduler._configure_daily_health_check_job()
mock_add_job.assert_called_once()
call_args = mock_add_job.call_args[0]
assert call_args[0] == "daily_health_check"
assert call_args[1] == "cron"
assert call_args[2] == task_scheduler._health_check
assert call_args[3] == {"hour": 9, "minute": 0} # 早上9点
@pytest.mark.asyncio
async def test_update_daily_kline_data_success(self, task_scheduler):
"""测试更新日K线数据成功"""
mock_data = [
{
"code": "000001",
"date": "2024-01-15",
"open": 10.5,
"close": 10.8
}
]
with patch.object(task_scheduler.data_manager, "get_daily_kline_data", return_value=mock_data):
with patch.object(task_scheduler.stock_repo, "save_daily_kline_data", return_value=True):
result = await task_scheduler._update_daily_kline_data()
assert result is True
@pytest.mark.asyncio
async def test_update_daily_kline_data_failure(self, task_scheduler):
"""测试更新日K线数据失败"""
with patch.object(task_scheduler.data_manager, "get_daily_kline_data", side_effect=Exception("API错误")):
result = await task_scheduler._update_daily_kline_data()
assert result is False
@pytest.mark.asyncio
async def test_update_financial_data_success(self, task_scheduler):
"""测试更新财务数据成功"""
mock_data = [
{
"code": "000001",
"report_date": "2023-12-31",
"eps": 1.5
}
]
with patch.object(task_scheduler.data_manager, "get_financial_report", return_value=mock_data):
with patch.object(task_scheduler.stock_repo, "save_financial_report_data", return_value=True):
result = await task_scheduler._update_financial_data()
assert result is True
@pytest.mark.asyncio
async def test_update_stock_basic_info_success(self, task_scheduler):
"""测试更新股票基础信息成功"""
mock_data = [
{
"code": "000001",
"name": "平安银行",
"market": "主板"
}
]
with patch.object(task_scheduler.data_manager, "get_stock_basic_info", return_value=mock_data):
with patch.object(task_scheduler.stock_repo, "save_stock_basic_info", return_value=True):
result = await task_scheduler._update_stock_basic_info()
assert result is True
@pytest.mark.asyncio
async def test_health_check_success(self, task_scheduler):
"""测试健康检查成功"""
with patch.object(task_scheduler.data_manager, "get_stock_basic_info", return_value=[]):
with patch.object(task_scheduler.stock_repo, "get_stock_basic_info", return_value=[]):
result = await task_scheduler._health_check()
assert result is True
def test_get_job_status(self, task_scheduler):
"""测试获取任务状态"""
# 启动调度器
with patch.object(task_scheduler, "_configure_jobs"):
task_scheduler.start()
# 添加一个测试任务
def test_job():
pass
task_scheduler._add_job("test_job", "date", test_job, {"run_date": datetime.now() + timedelta(hours=1)})
# 获取任务状态
status = task_scheduler.get_job_status()
assert "test_job" in status
assert status["test_job"]["next_run_time"] is not None
def test_remove_job_success(self, task_scheduler):
"""测试移除任务成功"""
# 启动调度器
with patch.object(task_scheduler, "_configure_jobs"):
task_scheduler.start()
# 添加一个测试任务
def test_job():
pass
task_scheduler._add_job("test_job", "date", test_job, {"run_date": datetime.now() + timedelta(hours=1)})
# 移除任务
result = task_scheduler.remove_job("test_job")
assert result is True
# 验证任务已被移除
status = task_scheduler.get_job_status()
assert "test_job" not in status
def test_remove_job_not_found(self, task_scheduler):
"""测试移除不存在的任务"""
# 启动调度器
with patch.object(task_scheduler, "_configure_jobs"):
task_scheduler.start()
# 移除不存在的任务
result = task_scheduler.remove_job("nonexistent_job")
assert result is False
class TestIntegration:
"""集成测试类"""
@pytest.mark.asyncio
async def test_data_processing_pipeline(self):
"""测试数据处理流水线"""
# 创建数据处理器
processor = DataProcessor()
# 模拟原始数据
raw_stock_data = [
{
"代码": "000001",
"名称": "平安银行",
"市场": "主板",
"行业": "银行",
"地区": "广东",
"上市日期": "1991-04-03"
}
]
# 处理数据
processed_data = processor.process_stock_basic_info(raw_stock_data)
# 验证处理结果
assert len(processed_data) == 1
assert processed_data[0]["code"] == "000001"
assert processed_data[0]["name"] == "平安银行"
assert processed_data[0]["market"] == "主板"
assert processed_data[0]["industry"] == "银行"
assert processed_data[0]["area"] == "广东"
assert processed_data[0]["ipo_date"] == "1991-04-03"
@pytest.mark.asyncio
async def test_scheduler_integration(self):
"""测试调度器集成"""
scheduler = TaskScheduler()
# Mock所有依赖
with patch.object(scheduler, "_configure_jobs"):
# 启动调度器
result = await scheduler.start()
assert result is True
assert scheduler.is_running is True
# 停止调度器
result = await scheduler.stop()
assert result is True
assert scheduler.is_running is False